ports_unittest.cc (53595B)
1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include <inttypes.h> 6 #include <stdio.h> 7 8 #include <map> 9 #include <utility> 10 11 #include "base/logging.h" 12 #include "base/waitable_event.h" 13 #include "base/thread.h" 14 #include "base/string_util.h" 15 #include "mojo/core/ports/event.h" 16 #include "mojo/core/ports/node.h" 17 #include "mojo/core/ports/node_delegate.h" 18 #include "mojo/core/ports/port_locker.h" 19 #include "mojo/core/ports/user_message.h" 20 #include "testing/gtest/include/gtest/gtest.h" 21 22 #include "mozilla/Mutex.h" 23 24 namespace mojo { 25 namespace core { 26 namespace ports { 27 namespace test { 28 29 namespace { 30 31 // TODO(rockot): Remove this unnecessary alias. 32 using ScopedMessage = mozilla::UniquePtr<UserMessageEvent>; 33 34 class TestMessage : public UserMessage { 35 public: 36 static const TypeInfo kUserMessageTypeInfo; 37 38 explicit TestMessage(const std::string& payload) 39 : UserMessage(&kUserMessageTypeInfo), payload_(payload) {} 40 ~TestMessage() override = default; 41 42 const std::string& payload() const { return payload_; } 43 44 private: 45 std::string payload_; 46 }; 47 48 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {}; 49 50 ScopedMessage NewUserMessageEvent(const std::string& payload, 51 size_t num_ports) { 52 auto event = mozilla::MakeUnique<UserMessageEvent>(num_ports); 53 event->AttachMessage(mozilla::MakeUnique<TestMessage>(payload)); 54 return event; 55 } 56 57 bool MessageEquals(const ScopedMessage& message, const std::string& s) { 58 return message->GetMessage<TestMessage>()->payload() == s; 59 } 60 61 class TestNode; 62 63 class MessageRouter { 64 public: 65 virtual ~MessageRouter() = default; 66 67 virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name, 68 ScopedEvent event) = 0; 69 virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0; 70 }; 71 72 class TestNode : public NodeDelegate { 73 public: 74 explicit TestNode(uint64_t id) 75 : node_name_(id, 1), 76 node_(node_name_, this), 77 node_thread_(StringPrintf("Node %" PRIu64 " thread", id).c_str()), 78 events_available_event_(/* manual_reset */ false, 79 /* initially_signaled */ false), 80 idle_event_(/* manual_reset */ true, /* initially_signaled */ true) {} 81 82 ~TestNode() override { 83 StopWhenIdle(); 84 node_thread_.Stop(); 85 } 86 87 const NodeName& name() const { return node_name_; } 88 89 // NOTE: Node is thread-safe. 90 Node& node() { return node_; } 91 92 base::WaitableEvent& idle_event() { return idle_event_; } 93 94 bool IsIdle() { 95 mozilla::MutexAutoLock lock(lock_); 96 return started_ && !dispatching_ && 97 (incoming_events_.empty() || (block_on_event_ && blocked_)); 98 } 99 100 void BlockOnEvent(Event::Type type) { 101 mozilla::MutexAutoLock lock(lock_); 102 blocked_event_type_ = type; 103 block_on_event_ = true; 104 } 105 106 void Unblock() { 107 mozilla::MutexAutoLock lock(lock_); 108 block_on_event_ = false; 109 events_available_event_.Signal(); 110 } 111 112 void Start(MessageRouter* router) { 113 router_ = router; 114 node_thread_.Start(); 115 node_thread_.message_loop()->PostTask(mozilla::NewNonOwningRunnableMethod( 116 "TestNode::ProcessEvents", this, &TestNode::ProcessEvents)); 117 } 118 119 void StopWhenIdle() { 120 mozilla::MutexAutoLock lock(lock_); 121 should_quit_ = true; 122 events_available_event_.Signal(); 123 } 124 125 void WakeUp() { events_available_event_.Signal(); } 126 127 int SendStringMessage(const PortRef& port, const std::string& s) { 128 return node_.SendUserMessage(port, NewUserMessageEvent(s, 0)); 129 } 130 131 int SendMultipleMessages(const PortRef& port, size_t num_messages) { 132 for (size_t i = 0; i < num_messages; ++i) { 133 int result = SendStringMessage(port, ""); 134 if (result != OK) { 135 return result; 136 } 137 } 138 return OK; 139 } 140 141 int SendStringMessageWithPort(const PortRef& port, const std::string& s, 142 const PortName& sent_port_name) { 143 auto event = NewUserMessageEvent(s, 1); 144 event->ports()[0] = sent_port_name; 145 return node_.SendUserMessage(port, std::move(event)); 146 } 147 148 int SendStringMessageWithPort(const PortRef& port, const std::string& s, 149 const PortRef& sent_port) { 150 return SendStringMessageWithPort(port, s, sent_port.name()); 151 } 152 153 void set_drop_messages(bool value) { 154 mozilla::MutexAutoLock lock(lock_); 155 drop_messages_ = value; 156 } 157 158 void set_save_messages(bool value) { 159 mozilla::MutexAutoLock lock(lock_); 160 save_messages_ = value; 161 } 162 163 bool ReadMessage(const PortRef& port, ScopedMessage* message) { 164 return node_.GetMessage(port, message, nullptr) == OK && *message; 165 } 166 167 bool ReadMultipleMessages(const PortRef& port, size_t num_messages) { 168 for (size_t i = 0; i < num_messages; ++i) { 169 ScopedMessage message; 170 if (!ReadMessage(port, &message)) { 171 return false; 172 } 173 } 174 return true; 175 } 176 177 bool GetSavedMessage(ScopedMessage* message) { 178 mozilla::MutexAutoLock lock(lock_); 179 if (saved_messages_.empty()) { 180 message->reset(); 181 return false; 182 } 183 std::swap(*message, saved_messages_.front()); 184 saved_messages_.pop(); 185 return true; 186 } 187 188 void EnqueueEvent(const NodeName& from_node, ScopedEvent event) { 189 idle_event_.Reset(); 190 191 // NOTE: This may be called from ForwardMessage and thus must not reenter 192 // |node_|. 193 mozilla::MutexAutoLock lock(lock_); 194 incoming_events_.push({from_node, std::move(event)}); 195 events_available_event_.Signal(); 196 } 197 198 void ForwardEvent(const NodeName& node_name, ScopedEvent event) override { 199 { 200 mozilla::MutexAutoLock lock(lock_); 201 if (drop_messages_) { 202 DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " 203 << node_name; 204 205 mozilla::MutexAutoUnlock unlock(lock_); 206 ClosePortsInEvent(event.get()); 207 return; 208 } 209 } 210 211 DCHECK(router_); 212 DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name; 213 router_->ForwardEvent(this, node_name, std::move(event)); 214 } 215 216 void BroadcastEvent(ScopedEvent event) override { 217 router_->BroadcastEvent(this, std::move(event)); 218 } 219 220 void PortStatusChanged(const PortRef& port) override { 221 // The port may be closed, in which case we ignore the notification. 222 mozilla::MutexAutoLock lock(lock_); 223 if (!save_messages_) { 224 return; 225 } 226 227 for (;;) { 228 ScopedMessage message; 229 { 230 mozilla::MutexAutoUnlock unlock(lock_); 231 if (!ReadMessage(port, &message)) { 232 break; 233 } 234 } 235 236 saved_messages_.emplace(std::move(message)); 237 } 238 } 239 240 void ObserveRemoteNode(const NodeName& node) override { 241 DCHECK(node != node_name_); 242 } 243 244 void ClosePortsInEvent(Event* event) { 245 if (event->type() != Event::Type::kUserMessage) { 246 return; 247 } 248 249 UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event); 250 for (size_t i = 0; i < message_event->num_ports(); ++i) { 251 PortRef port; 252 ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port)); 253 EXPECT_EQ(OK, node_.ClosePort(port)); 254 } 255 } 256 257 uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) { 258 PortStatus status{}; 259 if (node_.GetStatus(port_ref, &status) != OK) { 260 return 0; 261 } 262 263 return status.unacknowledged_message_count; 264 } 265 266 void AllowPortMerge(const PortRef& port_ref) { 267 SinglePortLocker locker(&port_ref); 268 locker.port()->pending_merge_peer = true; 269 } 270 271 private: 272 void ProcessEvents() { 273 for (;;) { 274 events_available_event_.Wait(); 275 mozilla::MutexAutoLock lock(lock_); 276 277 if (should_quit_) { 278 return; 279 } 280 281 dispatching_ = true; 282 while (!incoming_events_.empty()) { 283 if (block_on_event_ && 284 incoming_events_.front().second->type() == blocked_event_type_) { 285 blocked_ = true; 286 // Go idle if we hit a blocked event type. 287 break; 288 } 289 blocked_ = false; 290 291 auto node_event_pair = std::move(incoming_events_.front()); 292 incoming_events_.pop(); 293 294 // NOTE: AcceptMessage() can re-enter this object to call any of the 295 // NodeDelegate interface methods. 296 mozilla::MutexAutoUnlock unlock(lock_); 297 node_.AcceptEvent(node_event_pair.first, 298 std::move(node_event_pair.second)); 299 } 300 301 dispatching_ = false; 302 started_ = true; 303 idle_event_.Signal(); 304 }; 305 } 306 307 const NodeName node_name_; 308 Node node_; 309 MessageRouter* router_ = nullptr; 310 311 base::Thread node_thread_; 312 base::WaitableEvent events_available_event_; 313 base::WaitableEvent idle_event_; 314 315 // Guards fields below. 316 mozilla::Mutex lock_ MOZ_UNANNOTATED{"TestNode"}; 317 bool started_ = false; 318 bool dispatching_ = false; 319 bool should_quit_ = false; 320 bool drop_messages_ = false; 321 bool save_messages_ = false; 322 bool blocked_ = false; 323 bool block_on_event_ = false; 324 Event::Type blocked_event_type_{}; 325 std::queue<std::pair<NodeName, ScopedEvent>> incoming_events_; 326 std::queue<ScopedMessage> saved_messages_; 327 }; 328 329 class PortsTest : public testing::Test, public MessageRouter { 330 public: 331 void AddNode(TestNode* node) { 332 { 333 mozilla::MutexAutoLock lock(lock_); 334 nodes_[node->name()] = node; 335 } 336 node->Start(this); 337 } 338 339 void RemoveNode(TestNode* node) { 340 { 341 mozilla::MutexAutoLock lock(lock_); 342 nodes_.erase(node->name()); 343 } 344 345 for (const auto& entry : nodes_) { 346 entry.second->node().LostConnectionToNode(node->name()); 347 } 348 } 349 350 // Waits until all known Nodes are idle. Message forwarding and processing 351 // is handled in such a way that idleness is a stable state: once all nodes in 352 // the system are idle, they will remain idle until the test explicitly 353 // initiates some further event (e.g. sending a message, closing a port, or 354 // removing a Node). 355 void WaitForIdle() { 356 for (;;) { 357 mozilla::MutexAutoLock global_lock(global_lock_); 358 bool all_nodes_idle = true; 359 for (const auto& entry : nodes_) { 360 if (!entry.second->IsIdle()) { 361 all_nodes_idle = false; 362 } 363 entry.second->WakeUp(); 364 } 365 if (all_nodes_idle) { 366 return; 367 } 368 369 // Wait for any Node to signal that it's idle. 370 mozilla::MutexAutoUnlock global_unlock(global_lock_); 371 std::vector<base::WaitableEvent*> events; 372 for (const auto& entry : nodes_) { 373 events.push_back(&entry.second->idle_event()); 374 } 375 base::WaitableEvent::WaitMany(events.data(), events.size()); 376 } 377 } 378 379 void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1, 380 PortRef* port1) { 381 if (node0 == node1) { 382 EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); 383 } else { 384 EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); 385 EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); 386 EXPECT_EQ( 387 OK, node0->node().InitializePort(*port0, node1->name(), port1->name(), 388 node1->name(), port1->name())); 389 EXPECT_EQ( 390 OK, node1->node().InitializePort(*port1, node0->name(), port0->name(), 391 node0->name(), port0->name())); 392 } 393 } 394 395 private: 396 // MessageRouter: 397 void ForwardEvent(TestNode* from_node, const NodeName& node_name, 398 ScopedEvent event) override { 399 mozilla::MutexAutoLock global_lock(global_lock_); 400 mozilla::MutexAutoLock lock(lock_); 401 // Drop messages from nodes that have been removed. 402 if (nodes_.find(from_node->name()) == nodes_.end()) { 403 from_node->ClosePortsInEvent(event.get()); 404 return; 405 } 406 407 auto it = nodes_.find(node_name); 408 if (it == nodes_.end()) { 409 DVLOG(1) << "Node not found: " << node_name; 410 return; 411 } 412 413 // Serialize and de-serialize all forwarded events. 414 size_t buf_size = event->GetSerializedSize(); 415 mozilla::UniquePtr<char[]> buf(new char[buf_size]); 416 event->Serialize(buf.get()); 417 ScopedEvent copy = Event::Deserialize(buf.get(), buf_size); 418 // This should always succeed unless serialization or deserialization 419 // is broken. In that case, the loss of events should cause a test failure. 420 ASSERT_TRUE(copy); 421 422 // Also copy the payload for user messages. 423 if (event->type() == Event::Type::kUserMessage) { 424 UserMessageEvent* message_event = 425 static_cast<UserMessageEvent*>(event.get()); 426 UserMessageEvent* message_copy = 427 static_cast<UserMessageEvent*>(copy.get()); 428 429 message_copy->AttachMessage(mozilla::MakeUnique<TestMessage>( 430 message_event->GetMessage<TestMessage>()->payload())); 431 } 432 433 it->second->EnqueueEvent(from_node->name(), std::move(event)); 434 } 435 436 void BroadcastEvent(TestNode* from_node, ScopedEvent event) override { 437 mozilla::MutexAutoLock global_lock(global_lock_); 438 mozilla::MutexAutoLock lock(lock_); 439 440 // Drop messages from nodes that have been removed. 441 if (nodes_.find(from_node->name()) == nodes_.end()) { 442 return; 443 } 444 445 for (const auto& entry : nodes_) { 446 TestNode* node = entry.second; 447 // Broadcast doesn't deliver to the local node. 448 if (node == from_node) { 449 continue; 450 } 451 node->EnqueueEvent(from_node->name(), event->CloneForBroadcast()); 452 } 453 } 454 455 // Acquired before any operation which makes a Node busy, and before testing 456 // if all nodes are idle. 457 mozilla::Mutex global_lock_ MOZ_UNANNOTATED{"PortsTest Global Lock"}; 458 459 mozilla::Mutex lock_ MOZ_UNANNOTATED{"PortsTest Lock"}; 460 std::map<NodeName, TestNode*> nodes_; 461 }; 462 463 } // namespace 464 465 TEST_F(PortsTest, Basic1) { 466 TestNode node0(0); 467 AddNode(&node0); 468 469 TestNode node1(1); 470 AddNode(&node1); 471 472 PortRef x0, x1; 473 CreatePortPair(&node0, &x0, &node1, &x1); 474 475 PortRef a0, a1; 476 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 477 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); 478 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 479 480 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 481 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 482 483 WaitForIdle(); 484 485 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 486 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 487 } 488 489 TEST_F(PortsTest, Basic2) { 490 TestNode node0(0); 491 AddNode(&node0); 492 493 TestNode node1(1); 494 AddNode(&node1); 495 496 PortRef x0, x1; 497 CreatePortPair(&node0, &x0, &node1, &x1); 498 499 PortRef b0, b1; 500 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); 501 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); 502 EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); 503 504 EXPECT_EQ(OK, node0.node().ClosePort(b0)); 505 506 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 507 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 508 509 WaitForIdle(); 510 511 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 512 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 513 } 514 515 TEST_F(PortsTest, Basic3) { 516 TestNode node0(0); 517 AddNode(&node0); 518 519 TestNode node1(1); 520 AddNode(&node1); 521 522 PortRef x0, x1; 523 CreatePortPair(&node0, &x0, &node1, &x1); 524 525 PortRef a0, a1; 526 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 527 528 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); 529 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); 530 531 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); 532 533 PortRef b0, b1; 534 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); 535 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); 536 EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); 537 538 EXPECT_EQ(OK, node0.node().ClosePort(b0)); 539 540 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 541 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 542 543 WaitForIdle(); 544 545 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 546 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 547 } 548 549 TEST_F(PortsTest, LostConnectionToNode1) { 550 TestNode node0(0); 551 AddNode(&node0); 552 553 TestNode node1(1); 554 AddNode(&node1); 555 node1.set_drop_messages(true); 556 557 PortRef x0, x1; 558 CreatePortPair(&node0, &x0, &node1, &x1); 559 560 // Transfer a port to node1 and simulate a lost connection to node1. 561 562 PortRef a0, a1; 563 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 564 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); 565 566 WaitForIdle(); 567 568 RemoveNode(&node1); 569 570 WaitForIdle(); 571 572 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 573 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 574 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 575 576 WaitForIdle(); 577 578 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 579 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 580 } 581 582 TEST_F(PortsTest, LostConnectionToNode2) { 583 TestNode node0(0); 584 AddNode(&node0); 585 586 TestNode node1(1); 587 AddNode(&node1); 588 589 PortRef x0, x1; 590 CreatePortPair(&node0, &x0, &node1, &x1); 591 592 PortRef a0, a1; 593 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 594 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); 595 596 WaitForIdle(); 597 598 node1.set_drop_messages(true); 599 600 RemoveNode(&node1); 601 602 WaitForIdle(); 603 604 // a0 should have eventually detected peer closure after node loss. 605 ScopedMessage message; 606 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, 607 node0.node().GetMessage(a0, &message, nullptr)); 608 EXPECT_FALSE(message); 609 610 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 611 612 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 613 614 EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr)); 615 EXPECT_TRUE(message); 616 node1.ClosePortsInEvent(message.get()); 617 618 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 619 620 WaitForIdle(); 621 622 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 623 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 624 } 625 626 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { 627 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost 628 // node. 629 630 TestNode node0(0); 631 AddNode(&node0); 632 633 TestNode node1(1); 634 AddNode(&node1); 635 636 TestNode node2(2); 637 AddNode(&node2); 638 639 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. 640 PortRef A, B, C, D; 641 CreatePortPair(&node0, &A, &node1, &B); 642 CreatePortPair(&node1, &C, &node2, &D); 643 644 // Create E-F and send F over A to node 1. 645 PortRef E, F; 646 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); 647 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); 648 649 WaitForIdle(); 650 651 ScopedMessage message; 652 ASSERT_TRUE(node1.ReadMessage(B, &message)); 653 ASSERT_EQ(1u, message->num_ports()); 654 655 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); 656 657 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 658 // will trivially become aware of the loss, and this test verifies that the 659 // port A on node 0 will eventually also become aware of it. 660 661 // Make sure node2 stops processing events when it encounters an ObserveProxy. 662 node2.BlockOnEvent(Event::Type::kObserveProxy); 663 664 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); 665 WaitForIdle(); 666 667 // Simulate node 1 and 2 disconnecting. 668 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); 669 670 // Let node2 continue processing events and wait for everyone to go idle. 671 node2.Unblock(); 672 WaitForIdle(); 673 674 // Port F should be gone. 675 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); 676 677 // Port E should have detected peer closure despite the fact that there is 678 // no longer a continuous route from F to E over which the event could travel. 679 PortStatus status{}; 680 EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); 681 EXPECT_TRUE(status.peer_closed); 682 683 EXPECT_EQ(OK, node0.node().ClosePort(A)); 684 EXPECT_EQ(OK, node1.node().ClosePort(B)); 685 EXPECT_EQ(OK, node1.node().ClosePort(C)); 686 EXPECT_EQ(OK, node0.node().ClosePort(E)); 687 688 WaitForIdle(); 689 690 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 691 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 692 } 693 694 TEST_F(PortsTest, LostConnectionToNodeAfterSendingObserveProxy) { 695 // Tests that a proxy gets cleaned up after a node disconnect if the 696 // previous port already received the ObserveProxy event. 697 698 TestNode node0(0); 699 AddNode(&node0); 700 701 TestNode node1(1); 702 AddNode(&node1); 703 704 TestNode node2(2); 705 AddNode(&node2); 706 707 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. 708 PortRef A, B, C, D; 709 CreatePortPair(&node0, &A, &node1, &B); 710 CreatePortPair(&node1, &C, &node2, &D); 711 712 // Create E-F and send F over A to node 1. 713 PortRef E, F; 714 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); 715 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); 716 717 WaitForIdle(); 718 719 ScopedMessage message; 720 ASSERT_TRUE(node1.ReadMessage(B, &message)); 721 ASSERT_EQ(1u, message->num_ports()); 722 723 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); 724 725 // Send F over C to node 2 and then simulate node 2 loss from node 1 after 726 // node 0 received the ObserveProxy event. Node 1 needs to clean up the 727 // closed proxy while the node 0 to node 2 connection is still intact. 728 node0.BlockOnEvent(Event::Type::kObserveProxy); 729 730 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); 731 WaitForIdle(); 732 733 // Simulate node 1 and 2 disconnecting. 734 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); 735 736 // Let node2 continue processing events and wait for everyone to go idle. 737 node0.Unblock(); 738 WaitForIdle(); 739 740 // Port F should be gone. 741 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); 742 743 EXPECT_EQ(OK, node0.node().ClosePort(A)); 744 EXPECT_EQ(OK, node1.node().ClosePort(B)); 745 EXPECT_EQ(OK, node1.node().ClosePort(C)); 746 EXPECT_EQ(OK, node0.node().ClosePort(E)); 747 748 WaitForIdle(); 749 750 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 751 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 752 } 753 754 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { 755 // Tests that a proxy gets cleaned up when its direct peer lives on a lost 756 // node and it's predecessor lives on the same node. 757 758 TestNode node0(0); 759 AddNode(&node0); 760 761 TestNode node1(1); 762 AddNode(&node1); 763 764 PortRef A, B; 765 CreatePortPair(&node0, &A, &node1, &B); 766 767 PortRef C, D; 768 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); 769 770 // Send D but block node0 on an ObserveProxy event. 771 node0.BlockOnEvent(Event::Type::kObserveProxy); 772 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); 773 774 // node0 won't collapse the proxy but node1 will receive the message before 775 // going idle. 776 WaitForIdle(); 777 778 ScopedMessage message; 779 ASSERT_TRUE(node1.ReadMessage(B, &message)); 780 ASSERT_EQ(1u, message->num_ports()); 781 PortRef E; 782 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); 783 784 RemoveNode(&node1); 785 786 node0.Unblock(); 787 WaitForIdle(); 788 789 // Port C should have detected peer closure. 790 PortStatus status{}; 791 EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); 792 EXPECT_TRUE(status.peer_closed); 793 794 EXPECT_EQ(OK, node0.node().ClosePort(A)); 795 EXPECT_EQ(OK, node1.node().ClosePort(B)); 796 EXPECT_EQ(OK, node0.node().ClosePort(C)); 797 EXPECT_EQ(OK, node1.node().ClosePort(E)); 798 799 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 800 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 801 } 802 803 TEST_F(PortsTest, GetMessage1) { 804 TestNode node(0); 805 AddNode(&node); 806 807 PortRef a0, a1; 808 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); 809 810 ScopedMessage message; 811 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); 812 EXPECT_FALSE(message); 813 814 EXPECT_EQ(OK, node.node().ClosePort(a1)); 815 816 WaitForIdle(); 817 818 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, 819 node.node().GetMessage(a0, &message, nullptr)); 820 EXPECT_FALSE(message); 821 822 EXPECT_EQ(OK, node.node().ClosePort(a0)); 823 824 WaitForIdle(); 825 826 EXPECT_TRUE(node.node().CanShutdownCleanly()); 827 } 828 829 TEST_F(PortsTest, GetMessage2) { 830 TestNode node(0); 831 AddNode(&node); 832 833 PortRef a0, a1; 834 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); 835 836 EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); 837 838 ScopedMessage message; 839 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); 840 841 ASSERT_TRUE(message); 842 EXPECT_TRUE(MessageEquals(message, "1")); 843 844 EXPECT_EQ(OK, node.node().ClosePort(a0)); 845 EXPECT_EQ(OK, node.node().ClosePort(a1)); 846 847 EXPECT_TRUE(node.node().CanShutdownCleanly()); 848 } 849 850 TEST_F(PortsTest, GetMessage3) { 851 TestNode node(0); 852 AddNode(&node); 853 854 PortRef a0, a1; 855 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); 856 857 const char* kStrings[] = {"1", "2", "3"}; 858 859 for (auto& kString : kStrings) { 860 EXPECT_EQ(OK, node.SendStringMessage(a1, kString)); 861 } 862 863 ScopedMessage message; 864 for (auto& kString : kStrings) { 865 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); 866 ASSERT_TRUE(message); 867 EXPECT_TRUE(MessageEquals(message, kString)); 868 } 869 870 EXPECT_EQ(OK, node.node().ClosePort(a0)); 871 EXPECT_EQ(OK, node.node().ClosePort(a1)); 872 873 EXPECT_TRUE(node.node().CanShutdownCleanly()); 874 } 875 876 TEST_F(PortsTest, Delegation1) { 877 TestNode node0(0); 878 AddNode(&node0); 879 880 TestNode node1(1); 881 AddNode(&node1); 882 883 PortRef x0, x1; 884 CreatePortPair(&node0, &x0, &node1, &x1); 885 886 // In this test, we send a message to a port that has been moved. 887 888 PortRef a0, a1; 889 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 890 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); 891 WaitForIdle(); 892 893 ScopedMessage message; 894 ASSERT_TRUE(node1.ReadMessage(x1, &message)); 895 ASSERT_EQ(1u, message->num_ports()); 896 EXPECT_TRUE(MessageEquals(message, "a1")); 897 898 // This is "a1" from the point of view of node1. 899 PortName a2_name = message->ports()[0]; 900 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); 901 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); 902 903 WaitForIdle(); 904 905 ASSERT_TRUE(node0.ReadMessage(x0, &message)); 906 ASSERT_EQ(1u, message->num_ports()); 907 EXPECT_TRUE(MessageEquals(message, "a2")); 908 909 // This is "a2" from the point of view of node1. 910 PortName a3_name = message->ports()[0]; 911 912 PortRef a3; 913 EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); 914 915 ASSERT_TRUE(node0.ReadMessage(a3, &message)); 916 EXPECT_EQ(0u, message->num_ports()); 917 EXPECT_TRUE(MessageEquals(message, "hello")); 918 919 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 920 EXPECT_EQ(OK, node0.node().ClosePort(a3)); 921 922 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 923 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 924 925 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 926 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 927 } 928 929 TEST_F(PortsTest, Delegation2) { 930 TestNode node0(0); 931 AddNode(&node0); 932 933 TestNode node1(1); 934 AddNode(&node1); 935 936 for (int i = 0; i < 100; ++i) { 937 // Setup pipe a<->b between node0 and node1. 938 PortRef A, B; 939 CreatePortPair(&node0, &A, &node1, &B); 940 941 PortRef C, D; 942 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); 943 944 PortRef E, F; 945 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); 946 947 node1.set_save_messages(true); 948 949 // Pass D over A to B. 950 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); 951 952 // Pass F over C to D. 953 EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); 954 955 // This message should find its way to node1. 956 EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); 957 958 WaitForIdle(); 959 960 EXPECT_EQ(OK, node0.node().ClosePort(C)); 961 EXPECT_EQ(OK, node0.node().ClosePort(E)); 962 963 EXPECT_EQ(OK, node0.node().ClosePort(A)); 964 EXPECT_EQ(OK, node1.node().ClosePort(B)); 965 966 bool got_hello = false; 967 ScopedMessage message; 968 while (node1.GetSavedMessage(&message)) { 969 node1.ClosePortsInEvent(message.get()); 970 if (MessageEquals(message, "hello")) { 971 got_hello = true; 972 break; 973 } 974 } 975 976 EXPECT_TRUE(got_hello); 977 978 WaitForIdle(); // Because closing ports may have generated tasks. 979 } 980 981 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 982 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 983 } 984 985 TEST_F(PortsTest, SendUninitialized) { 986 TestNode node(0); 987 AddNode(&node); 988 989 PortRef x0; 990 EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); 991 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); 992 EXPECT_EQ(OK, node.node().ClosePort(x0)); 993 EXPECT_TRUE(node.node().CanShutdownCleanly()); 994 } 995 996 TEST_F(PortsTest, SendFailure) { 997 TestNode node(0); 998 AddNode(&node); 999 1000 node.set_save_messages(true); 1001 1002 PortRef A, B; 1003 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1004 1005 // Try to send A over itself. 1006 1007 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, 1008 node.SendStringMessageWithPort(A, "oops", A)); 1009 1010 // Try to send B over A. 1011 1012 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, 1013 node.SendStringMessageWithPort(A, "nope", B)); 1014 1015 // B should be closed immediately. 1016 EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); 1017 1018 WaitForIdle(); 1019 1020 // There should have been no messages accepted. 1021 ScopedMessage message; 1022 EXPECT_FALSE(node.GetSavedMessage(&message)); 1023 1024 EXPECT_EQ(OK, node.node().ClosePort(A)); 1025 1026 WaitForIdle(); 1027 1028 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1029 } 1030 1031 TEST_F(PortsTest, DontLeakUnreceivedPorts) { 1032 TestNode node(0); 1033 AddNode(&node); 1034 1035 PortRef A, B, C, D; 1036 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1037 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); 1038 1039 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); 1040 1041 EXPECT_EQ(OK, node.node().ClosePort(C)); 1042 EXPECT_EQ(OK, node.node().ClosePort(A)); 1043 EXPECT_EQ(OK, node.node().ClosePort(B)); 1044 1045 WaitForIdle(); 1046 1047 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1048 } 1049 1050 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { 1051 TestNode node(0); 1052 AddNode(&node); 1053 1054 PortRef A, B, C, D; 1055 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1056 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); 1057 1058 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); 1059 1060 ScopedMessage message; 1061 EXPECT_TRUE(node.ReadMessage(B, &message)); 1062 ASSERT_EQ(1u, message->num_ports()); 1063 EXPECT_TRUE(MessageEquals(message, "foo")); 1064 PortRef E; 1065 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); 1066 1067 EXPECT_TRUE( 1068 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1069 1070 WaitForIdle(); 1071 1072 EXPECT_TRUE( 1073 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1074 EXPECT_FALSE(node.node().CanShutdownCleanly()); 1075 1076 EXPECT_EQ(OK, node.node().ClosePort(A)); 1077 EXPECT_EQ(OK, node.node().ClosePort(B)); 1078 EXPECT_EQ(OK, node.node().ClosePort(C)); 1079 EXPECT_EQ(OK, node.node().ClosePort(E)); 1080 1081 WaitForIdle(); 1082 1083 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1084 } 1085 1086 TEST_F(PortsTest, ProxyCollapse1) { 1087 TestNode node(0); 1088 AddNode(&node); 1089 1090 PortRef A, B; 1091 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1092 1093 PortRef X, Y; 1094 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1095 1096 ScopedMessage message; 1097 1098 // Send B and receive it as C. 1099 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1100 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1101 ASSERT_EQ(1u, message->num_ports()); 1102 PortRef C; 1103 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); 1104 1105 // Send C and receive it as D. 1106 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); 1107 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1108 ASSERT_EQ(1u, message->num_ports()); 1109 PortRef D; 1110 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); 1111 1112 // Send D and receive it as E. 1113 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); 1114 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1115 ASSERT_EQ(1u, message->num_ports()); 1116 PortRef E; 1117 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); 1118 1119 EXPECT_EQ(OK, node.node().ClosePort(X)); 1120 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1121 1122 EXPECT_EQ(OK, node.node().ClosePort(A)); 1123 EXPECT_EQ(OK, node.node().ClosePort(E)); 1124 1125 // The node should not idle until all proxies are collapsed. 1126 WaitForIdle(); 1127 1128 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1129 } 1130 1131 TEST_F(PortsTest, ProxyCollapse2) { 1132 TestNode node(0); 1133 AddNode(&node); 1134 1135 PortRef A, B; 1136 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1137 1138 PortRef X, Y; 1139 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1140 1141 ScopedMessage message; 1142 1143 // Send B and A to create proxies in each direction. 1144 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1145 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); 1146 1147 EXPECT_EQ(OK, node.node().ClosePort(X)); 1148 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1149 1150 // At this point we have a scenario with: 1151 // 1152 // D -> [B] -> C -> [A] 1153 // 1154 // Ensure that the proxies can collapse. The sent ports will be closed 1155 // eventually as a result of Y's closure. 1156 1157 WaitForIdle(); 1158 1159 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1160 } 1161 1162 TEST_F(PortsTest, SendWithClosedPeer) { 1163 // This tests that if a port is sent when its peer is already known to be 1164 // closed, the newly created port will be aware of that peer closure, and the 1165 // proxy will eventually collapse. 1166 1167 TestNode node(0); 1168 AddNode(&node); 1169 1170 // Send a message from A to B, then close A. 1171 PortRef A, B; 1172 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1173 EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); 1174 EXPECT_EQ(OK, node.node().ClosePort(A)); 1175 1176 // Now send B over X-Y as new port C. 1177 PortRef X, Y; 1178 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1179 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1180 ScopedMessage message; 1181 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1182 ASSERT_EQ(1u, message->num_ports()); 1183 PortRef C; 1184 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); 1185 1186 EXPECT_EQ(OK, node.node().ClosePort(X)); 1187 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1188 1189 WaitForIdle(); 1190 1191 // C should have received the message originally sent to B, and it should also 1192 // be aware of A's closure. 1193 1194 ASSERT_TRUE(node.ReadMessage(C, &message)); 1195 EXPECT_TRUE(MessageEquals(message, "hey")); 1196 1197 PortStatus status{}; 1198 EXPECT_EQ(OK, node.node().GetStatus(C, &status)); 1199 EXPECT_FALSE(status.receiving_messages); 1200 EXPECT_FALSE(status.has_messages); 1201 EXPECT_TRUE(status.peer_closed); 1202 1203 node.node().ClosePort(C); 1204 1205 WaitForIdle(); 1206 1207 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1208 } 1209 1210 TEST_F(PortsTest, SendWithClosedPeerSent) { 1211 // This tests that if a port is closed while some number of proxies are still 1212 // routing messages (directly or indirectly) to it, that the peer port is 1213 // eventually notified of the closure, and the dead-end proxies will 1214 // eventually be removed. 1215 1216 TestNode node(0); 1217 AddNode(&node); 1218 1219 PortRef X, Y; 1220 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); 1221 1222 PortRef A, B; 1223 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); 1224 1225 ScopedMessage message; 1226 1227 // Send A as new port C. 1228 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); 1229 1230 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1231 ASSERT_EQ(1u, message->num_ports()); 1232 PortRef C; 1233 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); 1234 1235 // Send C as new port D. 1236 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); 1237 1238 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1239 ASSERT_EQ(1u, message->num_ports()); 1240 PortRef D; 1241 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); 1242 1243 // Send a message to B through D, then close D. 1244 EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); 1245 EXPECT_EQ(OK, node.node().ClosePort(D)); 1246 1247 // Now send B as new port E. 1248 1249 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); 1250 EXPECT_EQ(OK, node.node().ClosePort(X)); 1251 1252 ASSERT_TRUE(node.ReadMessage(Y, &message)); 1253 ASSERT_EQ(1u, message->num_ports()); 1254 PortRef E; 1255 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); 1256 1257 EXPECT_EQ(OK, node.node().ClosePort(Y)); 1258 1259 WaitForIdle(); 1260 1261 // E should receive the message originally sent to B, and it should also be 1262 // aware of D's closure. 1263 1264 ASSERT_TRUE(node.ReadMessage(E, &message)); 1265 EXPECT_TRUE(MessageEquals(message, "hey")); 1266 1267 PortStatus status{}; 1268 EXPECT_EQ(OK, node.node().GetStatus(E, &status)); 1269 EXPECT_FALSE(status.receiving_messages); 1270 EXPECT_FALSE(status.has_messages); 1271 EXPECT_TRUE(status.peer_closed); 1272 1273 EXPECT_EQ(OK, node.node().ClosePort(E)); 1274 1275 WaitForIdle(); 1276 1277 EXPECT_TRUE(node.node().CanShutdownCleanly()); 1278 } 1279 1280 TEST_F(PortsTest, MergePorts) { 1281 TestNode node0(0); 1282 AddNode(&node0); 1283 1284 TestNode node1(1); 1285 AddNode(&node1); 1286 1287 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1288 PortRef A, B, C, D; 1289 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1290 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1291 1292 // Write a message on A. 1293 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); 1294 1295 // Initiate a merge between B and C. 1296 node1.AllowPortMerge(C); 1297 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1298 1299 WaitForIdle(); 1300 1301 // Expect all proxies to be gone once idle. 1302 EXPECT_TRUE( 1303 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1304 EXPECT_TRUE( 1305 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1306 1307 // Expect D to have received the message sent on A. 1308 ScopedMessage message; 1309 ASSERT_TRUE(node1.ReadMessage(D, &message)); 1310 EXPECT_TRUE(MessageEquals(message, "hey")); 1311 1312 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1313 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1314 1315 // No more ports should be open. 1316 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1317 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1318 } 1319 1320 TEST_F(PortsTest, MergePortWithClosedPeer1) { 1321 // This tests that the right thing happens when initiating a merge on a port 1322 // whose peer has already been closed. 1323 1324 TestNode node0(0); 1325 AddNode(&node0); 1326 1327 TestNode node1(1); 1328 AddNode(&node1); 1329 1330 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1331 PortRef A, B, C, D; 1332 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1333 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1334 1335 // Write a message on A. 1336 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); 1337 1338 // Close A. 1339 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1340 1341 // Initiate a merge between B and C. 1342 node1.AllowPortMerge(C); 1343 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1344 1345 WaitForIdle(); 1346 1347 // Expect all proxies to be gone once idle. node0 should have no ports since 1348 // A was explicitly closed. 1349 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1350 EXPECT_TRUE( 1351 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1352 1353 // Expect D to have received the message sent on A. 1354 ScopedMessage message; 1355 ASSERT_TRUE(node1.ReadMessage(D, &message)); 1356 EXPECT_TRUE(MessageEquals(message, "hey")); 1357 1358 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1359 1360 // No more ports should be open. 1361 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1362 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1363 } 1364 1365 TEST_F(PortsTest, MergePortWithClosedPeer2) { 1366 // This tests that the right thing happens when merging into a port whose peer 1367 // has already been closed. 1368 1369 TestNode node0(0); 1370 AddNode(&node0); 1371 1372 TestNode node1(1); 1373 AddNode(&node1); 1374 1375 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1376 PortRef A, B, C, D; 1377 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1378 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1379 1380 // Write a message on D and close it. 1381 EXPECT_EQ(OK, node1.SendStringMessage(D, "hey")); 1382 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1383 1384 // Initiate a merge between B and C. 1385 node1.AllowPortMerge(C); 1386 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1387 1388 WaitForIdle(); 1389 1390 // Expect all proxies to be gone once idle. node1 should have no ports since 1391 // D was explicitly closed. 1392 EXPECT_TRUE( 1393 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); 1394 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1395 1396 // Expect A to have received the message sent on D. 1397 ScopedMessage message; 1398 ASSERT_TRUE(node0.ReadMessage(A, &message)); 1399 EXPECT_TRUE(MessageEquals(message, "hey")); 1400 1401 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1402 1403 // No more ports should be open. 1404 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1405 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1406 } 1407 1408 TEST_F(PortsTest, MergePortsWithClosedPeers) { 1409 // This tests that no residual ports are left behind if two ports are merged 1410 // when both of their peers have been closed. 1411 1412 TestNode node0(0); 1413 AddNode(&node0); 1414 1415 TestNode node1(1); 1416 AddNode(&node1); 1417 1418 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1419 PortRef A, B, C, D; 1420 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1421 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1422 1423 // Close A and D. 1424 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1425 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1426 1427 WaitForIdle(); 1428 1429 // Initiate a merge between B and C. 1430 node1.AllowPortMerge(C); 1431 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1432 1433 WaitForIdle(); 1434 1435 // Expect everything to have gone away. 1436 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1437 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1438 } 1439 1440 TEST_F(PortsTest, MergePortsWithMovedPeers) { 1441 // This tests that ports can be merged successfully even if their peers are 1442 // moved around. 1443 1444 TestNode node0(0); 1445 AddNode(&node0); 1446 1447 TestNode node1(1); 1448 AddNode(&node1); 1449 1450 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1451 PortRef A, B, C, D; 1452 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1453 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1454 1455 // Set up another pair X-Y for moving ports on node0. 1456 PortRef X, Y; 1457 EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); 1458 1459 ScopedMessage message; 1460 1461 // Move A to new port E. 1462 EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); 1463 ASSERT_TRUE(node0.ReadMessage(Y, &message)); 1464 ASSERT_EQ(1u, message->num_ports()); 1465 PortRef E; 1466 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); 1467 1468 EXPECT_EQ(OK, node0.node().ClosePort(X)); 1469 EXPECT_EQ(OK, node0.node().ClosePort(Y)); 1470 1471 // Write messages on E and D. 1472 EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); 1473 EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); 1474 1475 // Initiate a merge between B and C. 1476 node1.AllowPortMerge(C); 1477 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1478 1479 WaitForIdle(); 1480 1481 // Expect to receive D's message on E and E's message on D. 1482 ASSERT_TRUE(node0.ReadMessage(E, &message)); 1483 EXPECT_TRUE(MessageEquals(message, "hi")); 1484 ASSERT_TRUE(node1.ReadMessage(D, &message)); 1485 EXPECT_TRUE(MessageEquals(message, "hey")); 1486 1487 // Close E and D. 1488 EXPECT_EQ(OK, node0.node().ClosePort(E)); 1489 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1490 1491 WaitForIdle(); 1492 1493 // Expect everything to have gone away. 1494 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1495 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1496 } 1497 1498 TEST_F(PortsTest, MergePortsFailsGracefully) { 1499 // This tests that the system remains in a well-defined state if something 1500 // goes wrong during port merge. 1501 1502 TestNode node0(0); 1503 AddNode(&node0); 1504 1505 TestNode node1(1); 1506 AddNode(&node1); 1507 1508 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1509 PortRef A, B, C, D; 1510 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); 1511 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); 1512 1513 ScopedMessage message; 1514 PortRef X, Y; 1515 EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X)); 1516 EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y)); 1517 EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name(), 1518 node1.name(), Y.name())); 1519 EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name(), 1520 node0.name(), X.name())); 1521 1522 // Block the merge from proceeding until we can do something stupid with port 1523 // C. This avoids the test logic racing with async merge logic. 1524 node1.BlockOnEvent(Event::Type::kMergePort); 1525 1526 // Initiate the merge between B and C. 1527 node1.AllowPortMerge(C); 1528 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); 1529 1530 // Move C to a new port E. This is not a sane use of Node's public API but 1531 // is still hypothetically possible. It allows us to force a merge failure 1532 // because C will be in an invalid state by the time the merge is processed. 1533 // As a result, B should be closed. 1534 EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C)); 1535 1536 node1.Unblock(); 1537 1538 WaitForIdle(); 1539 1540 ASSERT_TRUE(node0.ReadMessage(X, &message)); 1541 ASSERT_EQ(1u, message->num_ports()); 1542 PortRef E; 1543 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); 1544 1545 EXPECT_EQ(OK, node0.node().ClosePort(X)); 1546 EXPECT_EQ(OK, node1.node().ClosePort(Y)); 1547 1548 WaitForIdle(); 1549 1550 // C goes away as a result of normal proxy removal. B should have been closed 1551 // cleanly by the failed MergePorts. 1552 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); 1553 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); 1554 1555 // Close A, D, and E. 1556 EXPECT_EQ(OK, node0.node().ClosePort(A)); 1557 EXPECT_EQ(OK, node1.node().ClosePort(D)); 1558 EXPECT_EQ(OK, node0.node().ClosePort(E)); 1559 1560 WaitForIdle(); 1561 1562 // Expect everything to have gone away. 1563 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1564 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1565 } 1566 1567 TEST_F(PortsTest, RemotePeerStatus) { 1568 TestNode node0(0); 1569 AddNode(&node0); 1570 1571 TestNode node1(1); 1572 AddNode(&node1); 1573 1574 // Create a local port pair. Neither port should appear to have a remote peer. 1575 PortRef a, b; 1576 PortStatus status{}; 1577 node0.node().CreatePortPair(&a, &b); 1578 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1579 EXPECT_FALSE(status.peer_remote); 1580 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1581 EXPECT_FALSE(status.peer_remote); 1582 1583 // Create a port pair spanning the two nodes. Both spanning ports should 1584 // immediately appear to have a remote peer. 1585 PortRef x0, x1; 1586 CreatePortPair(&node0, &x0, &node1, &x1); 1587 1588 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); 1589 EXPECT_TRUE(status.peer_remote); 1590 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); 1591 EXPECT_TRUE(status.peer_remote); 1592 1593 PortRef x2, x3; 1594 CreatePortPair(&node0, &x2, &node1, &x3); 1595 1596 // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers 1597 // remote and the remote peers local. 1598 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b)); 1599 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1)); 1600 WaitForIdle(); 1601 1602 ScopedMessage message; 1603 ASSERT_TRUE(node0.ReadMessage(x2, &message)); 1604 ASSERT_EQ(1u, message->num_ports()); 1605 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1)); 1606 1607 ASSERT_TRUE(node1.ReadMessage(x3, &message)); 1608 ASSERT_EQ(1u, message->num_ports()); 1609 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b)); 1610 1611 // Now x0-x1 should be local to node0 and a-b should span the nodes. 1612 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); 1613 EXPECT_FALSE(status.peer_remote); 1614 ASSERT_EQ(OK, node0.node().GetStatus(x1, &status)); 1615 EXPECT_FALSE(status.peer_remote); 1616 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1617 EXPECT_TRUE(status.peer_remote); 1618 ASSERT_EQ(OK, node1.node().GetStatus(b, &status)); 1619 EXPECT_TRUE(status.peer_remote); 1620 1621 // And swap them back one more time. 1622 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1)); 1623 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b)); 1624 WaitForIdle(); 1625 1626 ASSERT_TRUE(node0.ReadMessage(x2, &message)); 1627 ASSERT_EQ(1u, message->num_ports()); 1628 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b)); 1629 1630 ASSERT_TRUE(node1.ReadMessage(x3, &message)); 1631 ASSERT_EQ(1u, message->num_ports()); 1632 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1)); 1633 1634 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); 1635 EXPECT_TRUE(status.peer_remote); 1636 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); 1637 EXPECT_TRUE(status.peer_remote); 1638 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1639 EXPECT_FALSE(status.peer_remote); 1640 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1641 EXPECT_FALSE(status.peer_remote); 1642 1643 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 1644 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 1645 EXPECT_EQ(OK, node0.node().ClosePort(x2)); 1646 EXPECT_EQ(OK, node1.node().ClosePort(x3)); 1647 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1648 EXPECT_EQ(OK, node0.node().ClosePort(b)); 1649 1650 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1651 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1652 } 1653 1654 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) { 1655 TestNode node0(0); 1656 AddNode(&node0); 1657 1658 TestNode node1(1); 1659 AddNode(&node1); 1660 1661 // Set up a-b on node0 and c-d spanning node0-node1. 1662 PortRef a, b, c, d; 1663 node0.node().CreatePortPair(&a, &b); 1664 CreatePortPair(&node0, &c, &node1, &d); 1665 1666 PortStatus status{}; 1667 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1668 EXPECT_FALSE(status.peer_remote); 1669 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1670 EXPECT_FALSE(status.peer_remote); 1671 ASSERT_EQ(OK, node0.node().GetStatus(c, &status)); 1672 EXPECT_TRUE(status.peer_remote); 1673 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1674 EXPECT_TRUE(status.peer_remote); 1675 1676 EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c)); 1677 WaitForIdle(); 1678 1679 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1680 EXPECT_TRUE(status.peer_remote); 1681 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1682 EXPECT_TRUE(status.peer_remote); 1683 1684 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1685 EXPECT_EQ(OK, node1.node().ClosePort(d)); 1686 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1687 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1688 } 1689 1690 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) { 1691 TestNode node0(0); 1692 AddNode(&node0); 1693 1694 TestNode node1(1); 1695 AddNode(&node1); 1696 1697 // Set up a-b on node0 and c-d on node1. 1698 PortRef a, b, c, d; 1699 node0.node().CreatePortPair(&a, &b); 1700 node1.node().CreatePortPair(&c, &d); 1701 1702 PortStatus status{}; 1703 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1704 EXPECT_FALSE(status.peer_remote); 1705 ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); 1706 EXPECT_FALSE(status.peer_remote); 1707 ASSERT_EQ(OK, node1.node().GetStatus(c, &status)); 1708 EXPECT_FALSE(status.peer_remote); 1709 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1710 EXPECT_FALSE(status.peer_remote); 1711 1712 node1.AllowPortMerge(c); 1713 EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name())); 1714 WaitForIdle(); 1715 1716 ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); 1717 EXPECT_TRUE(status.peer_remote); 1718 ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); 1719 EXPECT_TRUE(status.peer_remote); 1720 1721 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1722 EXPECT_EQ(OK, node1.node().ClosePort(d)); 1723 EXPECT_TRUE(node0.node().CanShutdownCleanly()); 1724 EXPECT_TRUE(node1.node().CanShutdownCleanly()); 1725 } 1726 1727 TEST_F(PortsTest, RetransmitUserMessageEvents) { 1728 // Ensures that user message events can be retransmitted properly. 1729 TestNode node0(0); 1730 AddNode(&node0); 1731 1732 PortRef a, b; 1733 node0.node().CreatePortPair(&a, &b); 1734 1735 // Ping. 1736 const char* kMessage = "hey"; 1737 ScopedMessage message; 1738 EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage)); 1739 ASSERT_TRUE(node0.ReadMessage(b, &message)); 1740 EXPECT_TRUE(MessageEquals(message, kMessage)); 1741 1742 // Pong. 1743 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); 1744 EXPECT_FALSE(message); 1745 ASSERT_TRUE(node0.ReadMessage(a, &message)); 1746 EXPECT_TRUE(MessageEquals(message, kMessage)); 1747 1748 // Ping again. 1749 EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message))); 1750 EXPECT_FALSE(message); 1751 ASSERT_TRUE(node0.ReadMessage(b, &message)); 1752 EXPECT_TRUE(MessageEquals(message, kMessage)); 1753 1754 // Pong again! 1755 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); 1756 EXPECT_FALSE(message); 1757 ASSERT_TRUE(node0.ReadMessage(a, &message)); 1758 EXPECT_TRUE(MessageEquals(message, kMessage)); 1759 1760 EXPECT_EQ(OK, node0.node().ClosePort(a)); 1761 EXPECT_EQ(OK, node0.node().ClosePort(b)); 1762 } 1763 1764 TEST_F(PortsTest, SetAcknowledgeRequestInterval) { 1765 TestNode node0(0); 1766 AddNode(&node0); 1767 1768 PortRef a0, a1; 1769 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); 1770 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); 1771 1772 // Send a batch of messages. 1773 EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15)); 1774 EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0)); 1775 EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5)); 1776 WaitForIdle(); 1777 EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0)); 1778 1779 // Set to acknowledge every read message, and validate that already-read 1780 // messages are acknowledged. 1781 EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1)); 1782 WaitForIdle(); 1783 EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0)); 1784 1785 // Read a third of the messages from the other end. 1786 EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5)); 1787 WaitForIdle(); 1788 1789 EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0)); 1790 1791 TestNode node1(1); 1792 AddNode(&node1); 1793 1794 // Transfer a1 across to node1. 1795 PortRef x0, x1; 1796 CreatePortPair(&node0, &x0, &node1, &x1); 1797 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); 1798 WaitForIdle(); 1799 1800 ScopedMessage message; 1801 ASSERT_TRUE(node1.ReadMessage(x1, &message)); 1802 ASSERT_EQ(1u, message->num_ports()); 1803 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1)); 1804 1805 // Read the last third of the messages from the transferred node, and 1806 // validate that the unacknowledge message count updates correctly. 1807 EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5)); 1808 WaitForIdle(); 1809 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); 1810 1811 // Turn the acknowledges down and validate that they don't go on indefinitely. 1812 EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0)); 1813 EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10)); 1814 WaitForIdle(); 1815 EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10)); 1816 WaitForIdle(); 1817 EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0)); 1818 1819 // Close the far port and validate that the closure updates the unacknowledged 1820 // count. 1821 EXPECT_EQ(OK, node1.node().ClosePort(a1)); 1822 WaitForIdle(); 1823 EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0)); 1824 1825 EXPECT_EQ(OK, node0.node().ClosePort(a0)); 1826 EXPECT_EQ(OK, node0.node().ClosePort(x0)); 1827 EXPECT_EQ(OK, node1.node().ClosePort(x1)); 1828 } 1829 1830 } // namespace test 1831 } // namespace ports 1832 } // namespace core 1833 } // namespace mojo