node.cc (76891B)
1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/core/ports/node.h" 6 7 #include <string.h> 8 9 #include <algorithm> 10 #include <atomic> 11 #include <utility> 12 #include <vector> 13 14 #include "mozilla/Mutex.h" 15 #include "mozilla/RandomNum.h" 16 #include "nsTArray.h" 17 18 #include "base/logging.h" 19 #include "mojo/core/ports/event.h" 20 #include "mojo/core/ports/node_delegate.h" 21 #include "mojo/core/ports/port_locker.h" 22 23 namespace mojo { 24 namespace core { 25 namespace ports { 26 27 namespace { 28 29 int DebugError(const char* message, int error_code) { 30 NOTREACHED() << "Oops: " << message; 31 return error_code; 32 } 33 34 #define OOPS(x) DebugError(#x, x) 35 36 bool CanAcceptMoreMessages(const Port* port) { 37 // Have we already doled out the last message (i.e., do we expect to NOT 38 // receive further messages)? 39 uint64_t next_sequence_num = port->message_queue.next_sequence_num(); 40 if (port->state == Port::kClosed) { 41 return false; 42 } 43 if (port->peer_closed || port->remove_proxy_on_last_message) { 44 if (port->peer_lost_unexpectedly) { 45 return port->message_queue.HasNextMessage(); 46 } 47 if (port->last_sequence_num_to_receive == next_sequence_num - 1) { 48 return false; 49 } 50 } 51 return true; 52 } 53 54 void GenerateRandomPortName(PortName* name) { 55 // FIXME: Chrome uses a cache to avoid extra calls to the system RNG when 56 // generating port names to keep this overhead down. If this method starts 57 // showing up on profiles we should consider doing the same. 58 *name = PortName{mozilla::RandomUint64OrDie(), mozilla::RandomUint64OrDie()}; 59 } 60 61 } // namespace 62 63 Node::Node(const NodeName& name, NodeDelegate* delegate) 64 : name_(name), delegate_(this, delegate) {} 65 66 Node::~Node() { 67 if (!ports_.empty()) { 68 DLOG(WARNING) << "Unclean shutdown for node " << name_; 69 } 70 } 71 72 bool Node::CanShutdownCleanly(ShutdownPolicy policy) { 73 PortLocker::AssertNoPortsLockedOnCurrentThread(); 74 mozilla::MutexAutoLock ports_lock(ports_lock_); 75 76 if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) { 77 #ifdef DEBUG 78 for (auto& entry : ports_) { 79 DVLOG(2) << "Port " << entry.first << " referencing node " 80 << entry.second->peer_node_name << " is blocking shutdown of " 81 << "node " << name_ << " (state=" << entry.second->state << ")"; 82 } 83 #endif 84 return ports_.empty(); 85 } 86 87 DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS); 88 89 // NOTE: This is not efficient, though it probably doesn't need to be since 90 // relatively few ports should be open during shutdown and shutdown doesn't 91 // need to be blazingly fast. 92 bool can_shutdown = true; 93 for (auto& entry : ports_) { 94 PortRef port_ref(entry.first, entry.second); 95 SinglePortLocker locker(&port_ref); 96 auto* port = locker.port(); 97 if (port->peer_node_name != name_ && port->state != Port::kReceiving) { 98 can_shutdown = false; 99 #ifdef DEBUG 100 DVLOG(2) << "Port " << entry.first << " referencing node " 101 << port->peer_node_name << " is blocking shutdown of " 102 << "node " << name_ << " (state=" << port->state << ")"; 103 #else 104 // Exit early when not debugging. 105 break; 106 #endif 107 } 108 } 109 110 return can_shutdown; 111 } 112 113 int Node::GetPort(const PortName& port_name, PortRef* port_ref) { 114 PortLocker::AssertNoPortsLockedOnCurrentThread(); 115 mozilla::MutexAutoLock lock(ports_lock_); 116 auto iter = ports_.find(port_name); 117 if (iter == ports_.end()) { 118 return ERROR_PORT_UNKNOWN; 119 } 120 121 #if defined(ANDROID) && defined(__aarch64__) 122 // Workaround for https://crbug.com/665869. 123 std::atomic_thread_fence(std::memory_order_seq_cst); 124 #endif 125 126 *port_ref = PortRef(port_name, iter->second); 127 return OK; 128 } 129 130 int Node::CreateUninitializedPort(PortRef* port_ref) { 131 PortName port_name; 132 GenerateRandomPortName(&port_name); 133 134 RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum)); 135 int rv = AddPortWithName(port_name, port); 136 if (rv != OK) { 137 return rv; 138 } 139 140 *port_ref = PortRef(port_name, std::move(port)); 141 return OK; 142 } 143 144 int Node::InitializePort(const PortRef& port_ref, 145 const NodeName& peer_node_name, 146 const PortName& peer_port_name, 147 const NodeName& prev_node_name, 148 const PortName& prev_port_name) { 149 { 150 // Must be acquired for UpdatePortPeerAddress below. 151 PortLocker::AssertNoPortsLockedOnCurrentThread(); 152 mozilla::MutexAutoLock ports_lock(ports_lock_); 153 154 SinglePortLocker locker(&port_ref); 155 auto* port = locker.port(); 156 if (port->state != Port::kUninitialized) { 157 return ERROR_PORT_STATE_UNEXPECTED; 158 } 159 160 port->state = Port::kReceiving; 161 UpdatePortPeerAddress(port_ref.name(), port, peer_node_name, 162 peer_port_name); 163 164 port->prev_node_name = prev_node_name; 165 port->prev_port_name = prev_port_name; 166 } 167 168 delegate_->PortStatusChanged(port_ref); 169 170 return OK; 171 } 172 173 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) { 174 int rv; 175 176 rv = CreateUninitializedPort(port0_ref); 177 if (rv != OK) { 178 return rv; 179 } 180 181 rv = CreateUninitializedPort(port1_ref); 182 if (rv != OK) { 183 return rv; 184 } 185 186 rv = InitializePort(*port0_ref, name_, port1_ref->name(), name_, 187 port1_ref->name()); 188 if (rv != OK) { 189 return rv; 190 } 191 192 rv = InitializePort(*port1_ref, name_, port0_ref->name(), name_, 193 port0_ref->name()); 194 if (rv != OK) { 195 return rv; 196 } 197 198 return OK; 199 } 200 201 int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) { 202 SinglePortLocker locker(&port_ref); 203 auto* port = locker.port(); 204 if (port->state == Port::kClosed) { 205 return ERROR_PORT_STATE_UNEXPECTED; 206 } 207 208 port->user_data = std::move(user_data); 209 210 return OK; 211 } 212 213 int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) { 214 SinglePortLocker locker(&port_ref); 215 auto* port = locker.port(); 216 if (port->state == Port::kClosed) { 217 return ERROR_PORT_STATE_UNEXPECTED; 218 } 219 220 *user_data = port->user_data; 221 222 return OK; 223 } 224 225 int Node::ClosePort(const PortRef& port_ref) { 226 std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages; 227 NodeName peer_node_name; 228 PortName peer_port_name; 229 uint64_t sequence_num = 0; 230 uint64_t last_sequence_num = 0; 231 bool was_initialized = false; 232 { 233 SinglePortLocker locker(&port_ref); 234 auto* port = locker.port(); 235 switch (port->state) { 236 case Port::kUninitialized: 237 break; 238 239 case Port::kReceiving: 240 was_initialized = true; 241 port->state = Port::kClosed; 242 243 // We pass along the sequence number of the last message sent from this 244 // port to allow the peer to have the opportunity to consume all inbound 245 // messages before notifying the embedder that this port is closed. 246 last_sequence_num = port->next_sequence_num_to_send - 1; 247 248 peer_node_name = port->peer_node_name; 249 peer_port_name = port->peer_port_name; 250 251 sequence_num = port->next_control_sequence_num_to_send++; 252 253 // If the port being closed still has unread messages, then we need to 254 // take care to close those ports so as to avoid leaking memory. 255 port->message_queue.TakeAllMessages(&undelivered_messages); 256 port->TakePendingMessages(undelivered_messages); 257 break; 258 259 default: 260 return ERROR_PORT_STATE_UNEXPECTED; 261 } 262 } 263 264 ErasePort(port_ref.name()); 265 266 if (was_initialized) { 267 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" 268 << name_ << " to " << peer_port_name << "@" << peer_node_name; 269 delegate_->ForwardEvent( 270 peer_node_name, 271 mozilla::MakeUnique<ObserveClosureEvent>( 272 peer_port_name, port_ref.name(), sequence_num, last_sequence_num)); 273 for (const auto& message : undelivered_messages) { 274 for (size_t i = 0; i < message->num_ports(); ++i) { 275 PortRef ref; 276 if (GetPort(message->ports()[i], &ref) == OK) { 277 ClosePort(ref); 278 } 279 } 280 } 281 } 282 return OK; 283 } 284 285 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) { 286 SinglePortLocker locker(&port_ref); 287 auto* port = locker.port(); 288 if (port->state != Port::kReceiving) { 289 return ERROR_PORT_STATE_UNEXPECTED; 290 } 291 292 // TODO(sroettger): include messages pending sender verification here? 293 port_status->has_messages = port->message_queue.HasNextMessage(); 294 port_status->receiving_messages = CanAcceptMoreMessages(port); 295 port_status->peer_closed = port->peer_closed; 296 port_status->peer_remote = port->peer_node_name != name_; 297 port_status->queued_message_count = 298 port->message_queue.queued_message_count(); 299 port_status->queued_num_bytes = port->message_queue.queued_num_bytes(); 300 port_status->unacknowledged_message_count = 301 port->next_sequence_num_to_send - port->last_sequence_num_acknowledged - 302 1; 303 304 #ifdef FUZZING_SNAPSHOT 305 port_status->peer_node_name = port->peer_node_name; 306 #endif 307 308 return OK; 309 } 310 311 int Node::GetMessage(const PortRef& port_ref, 312 mozilla::UniquePtr<UserMessageEvent>* message, 313 MessageFilter* filter) { 314 *message = nullptr; 315 316 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_; 317 318 NodeName peer_node_name; 319 ScopedEvent ack_event; 320 { 321 SinglePortLocker locker(&port_ref); 322 auto* port = locker.port(); 323 324 // This could also be treated like the port being unknown since the 325 // embedder should no longer be referring to a port that has been sent. 326 if (port->state != Port::kReceiving) { 327 return ERROR_PORT_STATE_UNEXPECTED; 328 } 329 330 // Let the embedder get messages until there are no more before reporting 331 // that the peer closed its end. 332 if (!CanAcceptMoreMessages(port)) { 333 return ERROR_PORT_PEER_CLOSED; 334 } 335 336 port->message_queue.GetNextMessage(message, filter); 337 if (*message && 338 (*message)->sequence_num() == port->sequence_num_to_acknowledge) { 339 peer_node_name = port->peer_node_name; 340 ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>( 341 port->peer_port_name, port_ref.name(), 342 port->next_control_sequence_num_to_send++, 343 port->sequence_num_to_acknowledge); 344 } 345 if (*message) { 346 // Message will be passed to the user, no need to block the queue. 347 port->message_queue.MessageProcessed(); 348 } 349 } 350 351 if (ack_event) { 352 delegate_->ForwardEvent(peer_node_name, std::move(ack_event)); 353 } 354 355 // Allow referenced ports to trigger PortStatusChanged calls. 356 if (*message) { 357 for (size_t i = 0; i < (*message)->num_ports(); ++i) { 358 PortRef new_port_ref; 359 int rv = GetPort((*message)->ports()[i], &new_port_ref); 360 361 DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_ 362 << " does not exist!"; 363 364 SinglePortLocker locker(&new_port_ref); 365 DCHECK_EQ(locker.port()->state, Port::kReceiving); 366 locker.port()->message_queue.set_signalable(true); 367 } 368 369 // The user may retransmit this message from another port. We reset the 370 // sequence number so that the message will get a new one if that happens. 371 (*message)->set_sequence_num(0); 372 } 373 374 return OK; 375 } 376 377 int Node::SendUserMessage(const PortRef& port_ref, 378 mozilla::UniquePtr<UserMessageEvent> message) { 379 int rv = SendUserMessageInternal(port_ref, &message); 380 if (rv != OK) { 381 // If send failed, close all carried ports. Note that we're careful not to 382 // close the sending port itself if it happened to be one of the encoded 383 // ports (an invalid but possible condition.) 384 for (size_t i = 0; i < message->num_ports(); ++i) { 385 if (message->ports()[i] == port_ref.name()) { 386 continue; 387 } 388 389 PortRef port; 390 if (GetPort(message->ports()[i], &port) == OK) { 391 ClosePort(port); 392 } 393 } 394 } 395 return rv; 396 } 397 398 int Node::SetAcknowledgeRequestInterval( 399 const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) { 400 NodeName peer_node_name; 401 PortName peer_port_name; 402 uint64_t sequence_num_to_request_ack = 0; 403 uint64_t sequence_num = 0; 404 { 405 SinglePortLocker locker(&port_ref); 406 auto* port = locker.port(); 407 if (port->state != Port::kReceiving) { 408 return ERROR_PORT_STATE_UNEXPECTED; 409 } 410 411 port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval; 412 if (!sequence_num_acknowledge_interval) { 413 return OK; 414 } 415 416 peer_node_name = port->peer_node_name; 417 peer_port_name = port->peer_port_name; 418 419 sequence_num_to_request_ack = port->last_sequence_num_acknowledged + 420 sequence_num_acknowledge_interval; 421 sequence_num = port->next_control_sequence_num_to_send++; 422 } 423 424 delegate_->ForwardEvent(peer_node_name, 425 mozilla::MakeUnique<UserMessageReadAckRequestEvent>( 426 peer_port_name, port_ref.name(), sequence_num, 427 sequence_num_to_request_ack)); 428 return OK; 429 } 430 431 bool Node::IsEventFromPreviousPeer(const Event& event) { 432 switch (event.type()) { 433 case Event::Type::kUserMessage: 434 return true; 435 case Event::Type::kPortAccepted: 436 // PortAccepted is sent by the next peer 437 return false; 438 case Event::Type::kObserveProxy: 439 // ObserveProxy with an invalid port name is a broadcast event 440 return event.port_name() != kInvalidPortName; 441 case Event::Type::kObserveProxyAck: 442 return true; 443 case Event::Type::kObserveClosure: 444 return true; 445 case Event::Type::kMergePort: 446 // MergePort is not from the previous peer 447 return false; 448 case Event::Type::kUserMessageReadAckRequest: 449 return true; 450 case Event::Type::kUserMessageReadAck: 451 return true; 452 case Event::Type::kUpdatePreviousPeer: 453 return true; 454 default: 455 // No need to check unknown message types since AcceptPeer will return 456 // an error. 457 return false; 458 } 459 } 460 461 int Node::AcceptEventInternal(const PortRef& port_ref, 462 const NodeName& from_node, ScopedEvent event) { 463 switch (event->type()) { 464 case Event::Type::kUserMessage: 465 return OnUserMessage(port_ref, from_node, 466 Event::Cast<UserMessageEvent>(&event)); 467 case Event::Type::kPortAccepted: 468 return OnPortAccepted(port_ref, Event::Cast<PortAcceptedEvent>(&event)); 469 case Event::Type::kObserveProxy: 470 return OnObserveProxy(port_ref, Event::Cast<ObserveProxyEvent>(&event)); 471 case Event::Type::kObserveProxyAck: 472 return OnObserveProxyAck(port_ref, 473 Event::Cast<ObserveProxyAckEvent>(&event)); 474 case Event::Type::kObserveClosure: 475 return OnObserveClosure(port_ref, 476 Event::Cast<ObserveClosureEvent>(&event)); 477 case Event::Type::kMergePort: 478 return OnMergePort(port_ref, Event::Cast<MergePortEvent>(&event)); 479 case Event::Type::kUserMessageReadAckRequest: 480 return OnUserMessageReadAckRequest( 481 port_ref, Event::Cast<UserMessageReadAckRequestEvent>(&event)); 482 case Event::Type::kUserMessageReadAck: 483 return OnUserMessageReadAck(port_ref, 484 Event::Cast<UserMessageReadAckEvent>(&event)); 485 case Event::Type::kUpdatePreviousPeer: 486 return OnUpdatePreviousPeer(port_ref, 487 Event::Cast<UpdatePreviousPeerEvent>(&event)); 488 } 489 return OOPS(ERROR_NOT_IMPLEMENTED); 490 } 491 492 int Node::AcceptEvent(const NodeName& from_node, ScopedEvent event) { 493 PortRef port_ref; 494 GetPort(event->port_name(), &port_ref); 495 496 DVLOG(2) << "AcceptEvent type: " << event->type() << ", " 497 << event->from_port() << "@" << from_node << " => " 498 << port_ref.name() << "@" << name_ 499 << " seq nr: " << event->control_sequence_num() << " port valid? " 500 << port_ref.is_valid(); 501 502 if (!IsEventFromPreviousPeer(*event)) { 503 DCHECK_EQ(event->control_sequence_num(), kInvalidSequenceNum); 504 // Some events are not coming from the previous peer, e.g. broadcasts or 505 // PortAccepted events. No need to check the sequence number or sender. 506 return AcceptEventInternal(port_ref, from_node, std::move(event)); 507 } 508 509 DCHECK_NE(event->control_sequence_num(), kInvalidSequenceNum); 510 511 if (!port_ref.is_valid()) { 512 // If we don't have a valid port, there's nothing for us to check. However, 513 // we pass the ref on to AcceptEventInternal to make sure there's no race 514 // where it becomes valid and we skipped the peer check. 515 return AcceptEventInternal(port_ref, from_node, std::move(event)); 516 } 517 518 #ifndef FUZZING_SNAPSHOT 519 // Before processing the event, verify the sender and sequence number. 520 { 521 SinglePortLocker locker(&port_ref); 522 auto* port = locker.port(); 523 if (!port->IsNextEvent(from_node, *event)) { 524 DVLOG(2) << "Buffering event (type " << event->type() 525 << "): " << event->from_port() << "@" << from_node << " => " 526 << port_ref.name() << "@" << name_ 527 << " seq nr: " << event->control_sequence_num() << " / " 528 << port->next_control_sequence_num_to_receive << ", want " 529 << port->prev_port_name << "@" << port->prev_node_name; 530 531 port->BufferEvent(from_node, std::move(event)); 532 return OK; 533 } 534 } 535 #endif 536 537 int ret = AcceptEventInternal(port_ref, from_node, std::move(event)); 538 539 // More events might have been enqueued during processing. 540 while (true) { 541 ScopedEvent next_event; 542 NodeName next_from_node; 543 { 544 SinglePortLocker locker(&port_ref); 545 auto* port = locker.port(); 546 // We always increment the control sequence number after we finished 547 // processing the event. That way we ensure that the events are handled 548 // in order without keeping a lock the whole time. 549 port->next_control_sequence_num_to_receive++; 550 port->NextEvent(&next_from_node, &next_event); 551 552 if (next_event) { 553 DVLOG(2) << "Handling buffered event (type " << next_event->type() 554 << "): " << next_event->from_port() << "@" << next_from_node 555 << " => " << port_ref.name() << "@" << name_ 556 << " seq nr: " << next_event->control_sequence_num() << " / " 557 << port->next_control_sequence_num_to_receive; 558 } 559 } 560 if (!next_event) { 561 break; 562 } 563 AcceptEventInternal(port_ref, next_from_node, std::move(next_event)); 564 } 565 566 return ret; 567 } 568 569 int Node::MergePorts(const PortRef& port_ref, 570 const NodeName& destination_node_name, 571 const PortName& destination_port_name) { 572 PortName new_port_name; 573 Event::PortDescriptor new_port_descriptor; 574 PendingUpdatePreviousPeer pending_update_event{.from_port = port_ref.name()}; 575 { 576 // Must be held for ConvertToProxy. 577 PortLocker::AssertNoPortsLockedOnCurrentThread(); 578 mozilla::MutexAutoLock ports_locker(ports_lock_); 579 580 SinglePortLocker locker(&port_ref); 581 582 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ 583 << " to " << destination_port_name << "@" << destination_node_name; 584 585 // Send the port-to-merge over to the destination node so it can be merged 586 // into the port cycle atomically there. 587 new_port_name = port_ref.name(); 588 ConvertToProxy(locker.port(), destination_node_name, &new_port_name, 589 &new_port_descriptor, &pending_update_event); 590 } 591 592 delegate_->ForwardEvent( 593 pending_update_event.receiver, 594 mozilla::MakeUnique<UpdatePreviousPeerEvent>( 595 pending_update_event.port, pending_update_event.from_port, 596 pending_update_event.sequence_num, pending_update_event.new_prev_node, 597 pending_update_event.new_prev_port)); 598 599 if (new_port_descriptor.peer_node_name == name_ && 600 destination_node_name != name_) { 601 // Ensure that the locally retained peer of the new proxy gets a status 602 // update so it notices that its peer is now remote. 603 PortRef local_peer; 604 if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) { 605 delegate_->PortStatusChanged(local_peer); 606 } 607 } 608 609 delegate_->ForwardEvent( 610 destination_node_name, 611 mozilla::MakeUnique<MergePortEvent>(destination_port_name, 612 kInvalidPortName, kInvalidSequenceNum, 613 new_port_name, new_port_descriptor)); 614 return OK; 615 } 616 617 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) { 618 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_ 619 << " and " << port1_ref.name() << "@" << name_; 620 return MergePortsInternal(port0_ref, port1_ref, 621 true /* allow_close_on_bad_state */); 622 } 623 624 int Node::LostConnectionToNode(const NodeName& node_name) { 625 // We can no longer send events to the given node. We also can't expect any 626 // PortAccepted events. 627 628 DVLOG(1) << "Observing lost connection from node " << name_ << " to node " 629 << node_name; 630 631 DestroyAllPortsWithPeer(node_name, kInvalidPortName); 632 return OK; 633 } 634 635 int Node::OnUserMessage(const PortRef& port_ref, const NodeName& from_node, 636 mozilla::UniquePtr<UserMessageEvent> message) { 637 #ifdef DEBUG 638 std::ostringstream ports_buf; 639 for (size_t i = 0; i < message->num_ports(); ++i) { 640 if (i > 0) { 641 ports_buf << ","; 642 } 643 ports_buf << message->ports()[i]; 644 } 645 646 DVLOG(4) << "OnUserMessage " << message->sequence_num() 647 << " [ports=" << ports_buf.str() << "] at " << message->port_name() 648 << "@" << name_; 649 #endif 650 651 // Even if this port does not exist, cannot receive anymore messages or is 652 // buffering or proxying messages, we still need these ports to be bound to 653 // this node. When the message is forwarded, these ports will get transferred 654 // following the usual method. If the message cannot be accepted, then the 655 // newly bound ports will simply be closed. 656 if (from_node != name_) { 657 for (size_t i = 0; i < message->num_ports(); ++i) { 658 Event::PortDescriptor& descriptor = message->port_descriptors()[i]; 659 int rv = AcceptPort(message->ports()[i], descriptor); 660 if (rv != OK) { 661 return rv; 662 } 663 } 664 } 665 666 bool has_next_message = false; 667 bool message_accepted = false; 668 bool should_forward_messages = false; 669 if (port_ref.is_valid()) { 670 SinglePortLocker locker(&port_ref); 671 auto* port = locker.port(); 672 673 // Reject spurious messages if we've already received the last expected 674 // message. 675 if (CanAcceptMoreMessages(port)) { 676 message_accepted = true; 677 port->message_queue.AcceptMessage(std::move(message), &has_next_message); 678 679 if (port->state == Port::kBuffering) { 680 has_next_message = false; 681 } else if (port->state == Port::kProxying) { 682 has_next_message = false; 683 should_forward_messages = true; 684 } 685 } 686 } 687 688 if (should_forward_messages) { 689 int rv = ForwardUserMessagesFromProxy(port_ref); 690 if (rv != OK) { 691 return rv; 692 } 693 TryRemoveProxy(port_ref); 694 } 695 696 if (!message_accepted) { 697 DVLOG(2) << "Message not accepted!\n"; 698 // Close all newly accepted ports as they are effectively orphaned. 699 for (size_t i = 0; i < message->num_ports(); ++i) { 700 PortRef attached_port_ref; 701 if (GetPort(message->ports()[i], &attached_port_ref) == OK) { 702 ClosePort(attached_port_ref); 703 } else { 704 DLOG(WARNING) << "Cannot close non-existent port!\n"; 705 } 706 } 707 } else if (has_next_message) { 708 delegate_->PortStatusChanged(port_ref); 709 } 710 711 return OK; 712 } 713 714 int Node::OnPortAccepted(const PortRef& port_ref, 715 mozilla::UniquePtr<PortAcceptedEvent> event) { 716 if (!port_ref.is_valid()) { 717 return ERROR_PORT_UNKNOWN; 718 } 719 720 #ifdef DEBUG 721 { 722 SinglePortLocker locker(&port_ref); 723 DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_ 724 << " pointing to " << locker.port()->peer_port_name << "@" 725 << locker.port()->peer_node_name; 726 } 727 #endif 728 729 return BeginProxying(port_ref); 730 } 731 732 int Node::OnObserveProxy(const PortRef& port_ref, 733 mozilla::UniquePtr<ObserveProxyEvent> event) { 734 if (event->port_name() == kInvalidPortName) { 735 // An ObserveProxy with an invalid target port name is a broadcast used to 736 // inform ports when their peer (which was itself a proxy) has become 737 // defunct due to unexpected node disconnection. 738 // 739 // Receiving ports affected by this treat it as equivalent to peer closure. 740 // Proxies affected by this can be removed and will in turn broadcast their 741 // own death with a similar message. 742 DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName); 743 DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName); 744 DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name()); 745 return OK; 746 } 747 748 // The port may have already been closed locally, in which case the 749 // ObserveClosure message will contain the last_sequence_num field. 750 // We can then silently ignore this message. 751 if (!port_ref.is_valid()) { 752 DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_ 753 << " not found"; 754 return OK; 755 } 756 757 DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_ 758 << ", proxy at " << event->proxy_port_name() << "@" 759 << event->proxy_node_name() << " pointing to " 760 << event->proxy_target_port_name() << "@" 761 << event->proxy_target_node_name(); 762 763 bool peer_changed = false; 764 ScopedEvent event_to_forward; 765 NodeName event_target_node; 766 { 767 // Must be acquired for UpdatePortPeerAddress below. 768 PortLocker::AssertNoPortsLockedOnCurrentThread(); 769 mozilla::MutexAutoLock ports_locker(ports_lock_); 770 771 SinglePortLocker locker(&port_ref); 772 auto* port = locker.port(); 773 774 if (port->peer_node_name == event->proxy_node_name() && 775 port->peer_port_name == event->proxy_port_name()) { 776 if (port->state == Port::kReceiving) { 777 // Updating the port peer will reset the sequence num. Grab it now; 778 uint64_t sequence_num = port->next_control_sequence_num_to_send++; 779 UpdatePortPeerAddress(port_ref.name(), port, 780 event->proxy_target_node_name(), 781 event->proxy_target_port_name()); 782 event_target_node = event->proxy_node_name(); 783 event_to_forward = mozilla::MakeUnique<ObserveProxyAckEvent>( 784 event->proxy_port_name(), port_ref.name(), sequence_num, 785 port->next_sequence_num_to_send - 1); 786 peer_changed = true; 787 DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name() 788 << "@" << name_ << " to " << event->proxy_port_name() << "@" 789 << event_target_node; 790 } else { 791 // As a proxy ourselves, we don't know how to honor the ObserveProxy 792 // event or to populate the last_sequence_num field of ObserveProxyAck. 793 // Afterall, another port could be sending messages to our peer now 794 // that we've sent out our own ObserveProxy event. Instead, we will 795 // send an ObserveProxyAck indicating that the ObserveProxy event 796 // should be re-sent (last_sequence_num set to kInvalidSequenceNum). 797 // However, this has to be done after we are removed as a proxy. 798 // Otherwise, we might just find ourselves back here again, which 799 // would be akin to a busy loop. 800 801 DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name() 802 << "@" << event->proxy_node_name(); 803 804 port->send_on_proxy_removal = 805 mozilla::MakeUnique<std::pair<NodeName, ScopedEvent>>( 806 event->proxy_node_name(), 807 mozilla::MakeUnique<ObserveProxyAckEvent>( 808 event->proxy_port_name(), port_ref.name(), 809 kInvalidSequenceNum, kInvalidSequenceNum)); 810 } 811 } else { 812 // Forward this event along to our peer. Eventually, it should find the 813 // port referring to the proxy. 814 event_target_node = port->peer_node_name; 815 event->set_port_name(port->peer_port_name); 816 event->set_from_port(port_ref.name()); 817 event->set_control_sequence_num( 818 port->next_control_sequence_num_to_send++); 819 if (port->state == Port::kBuffering) { 820 port->control_message_queue.push_back( 821 {event_target_node, std::move(event)}); 822 } else { 823 event_to_forward = std::move(event); 824 } 825 } 826 } 827 828 if (event_to_forward) { 829 delegate_->ForwardEvent(event_target_node, std::move(event_to_forward)); 830 } 831 832 if (peer_changed) { 833 // Re-send ack and/or ack requests, as the previous peer proxy may not have 834 // forwarded the previous request before it died. 835 MaybeResendAck(port_ref); 836 MaybeResendAckRequest(port_ref); 837 838 delegate_->PortStatusChanged(port_ref); 839 840 if (event->proxy_target_node_name() != name_) { 841 delegate_->ObserveRemoteNode(event->proxy_target_node_name()); 842 } 843 } 844 845 return OK; 846 } 847 848 int Node::OnObserveProxyAck(const PortRef& port_ref, 849 mozilla::UniquePtr<ObserveProxyAckEvent> event) { 850 DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_ 851 << " (last_sequence_num=" << event->last_sequence_num() << ")"; 852 853 if (!port_ref.is_valid()) { 854 return ERROR_PORT_UNKNOWN; // The port may have observed closure first. 855 } 856 857 bool try_remove_proxy_immediately; 858 bool erase_port = false; 859 { 860 SinglePortLocker locker(&port_ref); 861 auto* port = locker.port(); 862 863 if (port->state == Port::kProxying) { 864 // If the last sequence number is invalid, this is a signal that we need 865 // to retransmit the ObserveProxy event for this port rather than flagging 866 // the the proxy for removal ASAP. 867 try_remove_proxy_immediately = 868 event->last_sequence_num() != kInvalidSequenceNum; 869 if (try_remove_proxy_immediately) { 870 // We can now remove this port once we have received and forwarded the 871 // last message addressed to this port. 872 port->remove_proxy_on_last_message = true; 873 port->last_sequence_num_to_receive = event->last_sequence_num(); 874 } 875 } else if (port->state == Port::kClosed) { 876 erase_port = true; 877 } else { 878 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 879 } 880 } 881 882 if (erase_port) { 883 ErasePort(port_ref.name()); 884 return OK; 885 } 886 887 if (try_remove_proxy_immediately) { 888 TryRemoveProxy(port_ref); 889 } else { 890 InitiateProxyRemoval(port_ref); 891 } 892 893 return OK; 894 } 895 896 int Node::OnObserveClosure(const PortRef& port_ref, 897 mozilla::UniquePtr<ObserveClosureEvent> event) { 898 // OK if the port doesn't exist, as it may have been closed already. 899 if (!port_ref.is_valid()) { 900 return OK; 901 } 902 903 // This message tells the port that it should no longer expect more messages 904 // beyond last_sequence_num. This message is forwarded along until we reach 905 // the receiving end, and this message serves as an equivalent to 906 // ObserveProxyAck. 907 908 bool notify_delegate = false; 909 NodeName peer_node_name; 910 bool try_remove_proxy = false; 911 bool erase_port = false; 912 { 913 SinglePortLocker locker(&port_ref); 914 auto* port = locker.port(); 915 916 port->peer_closed = true; 917 port->last_sequence_num_to_receive = event->last_sequence_num(); 918 919 DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_ 920 << " (state=" << port->state << ") pointing to " 921 << port->peer_port_name << "@" << port->peer_node_name 922 << " (last_sequence_num=" << event->last_sequence_num() << ")"; 923 924 // We always forward ObserveClosure, even beyond the receiving port which 925 // cares about it. This ensures that any dead-end proxies beyond that port 926 // are notified to remove themselves. 927 928 if (port->state == Port::kReceiving) { 929 notify_delegate = true; 930 931 // When forwarding along the other half of the port cycle, this will only 932 // reach dead-end proxies. Tell them we've sent our last message so they 933 // can go away. 934 // 935 // TODO: Repurposing ObserveClosure for this has the desired result but 936 // may be semantically confusing since the forwarding port is not actually 937 // closed. Consider replacing this with a new event type. 938 event->set_last_sequence_num(port->next_sequence_num_to_send - 1); 939 940 // Treat the closure as an acknowledge that all sent messages have been 941 // read from the other end. 942 port->last_sequence_num_acknowledged = 943 port->next_sequence_num_to_send - 1; 944 } else if (port->state == Port::kClosed) { 945 // This is the ack for a closed proxy port notification. Now it's fine to 946 // delete the port. 947 erase_port = true; 948 } else { 949 // We haven't yet reached the receiving peer of the closed port, so we'll 950 // forward the message along as-is. 951 // See about removing the port if it is a proxy as our peer won't be able 952 // to participate in proxy removal. 953 port->remove_proxy_on_last_message = true; 954 if (port->state == Port::kProxying) { 955 try_remove_proxy = true; 956 } 957 } 958 959 DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@" 960 << name_ << " to peer " << port->peer_port_name << "@" 961 << port->peer_node_name 962 << " (last_sequence_num=" << event->last_sequence_num() << ")"; 963 964 event->set_port_name(port->peer_port_name); 965 event->set_from_port(port_ref.name()); 966 event->set_control_sequence_num(port->next_control_sequence_num_to_send++); 967 peer_node_name = port->peer_node_name; 968 969 if (port->state == Port::kBuffering) { 970 port->control_message_queue.push_back({peer_node_name, std::move(event)}); 971 } 972 } 973 974 if (try_remove_proxy) { 975 TryRemoveProxy(port_ref); 976 } 977 978 if (erase_port) { 979 ErasePort(port_ref.name()); 980 } 981 982 if (event) { 983 delegate_->ForwardEvent(peer_node_name, std::move(event)); 984 } 985 986 if (notify_delegate) { 987 delegate_->PortStatusChanged(port_ref); 988 } 989 990 return OK; 991 } 992 993 int Node::OnMergePort(const PortRef& port_ref, 994 mozilla::UniquePtr<MergePortEvent> event) { 995 DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_ 996 << " merging with proxy " << event->new_port_name() << "@" << name_ 997 << " pointing to " << event->new_port_descriptor().peer_port_name 998 << "@" << event->new_port_descriptor().peer_node_name 999 << " referred by " 1000 << event->new_port_descriptor().referring_port_name << "@" 1001 << event->new_port_descriptor().referring_node_name; 1002 1003 // Accept the new port. This is now the receiving end of the other port cycle 1004 // to be merged with ours. Note that we always attempt to accept the new port 1005 // first as otherwise its peer receiving port could be left stranded 1006 // indefinitely. 1007 if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) { 1008 if (port_ref.is_valid()) { 1009 ClosePort(port_ref); 1010 } 1011 return ERROR_PORT_STATE_UNEXPECTED; 1012 } 1013 1014 PortRef new_port_ref; 1015 GetPort(event->new_port_name(), &new_port_ref); 1016 if (!port_ref.is_valid() && new_port_ref.is_valid()) { 1017 ClosePort(new_port_ref); 1018 return ERROR_PORT_UNKNOWN; 1019 } 1020 if (port_ref.is_valid() && !new_port_ref.is_valid()) { 1021 ClosePort(port_ref); 1022 return ERROR_PORT_UNKNOWN; 1023 } 1024 1025 bool peer_allowed = true; 1026 { 1027 SinglePortLocker locker(&port_ref); 1028 auto* port = locker.port(); 1029 if (!port->pending_merge_peer) { 1030 CHROMIUM_LOG(ERROR) << "MergePort called on unexpected port: " 1031 << event->port_name(); 1032 peer_allowed = false; 1033 } else { 1034 port->pending_merge_peer = false; 1035 } 1036 } 1037 if (!peer_allowed) { 1038 ClosePort(port_ref); 1039 return ERROR_PORT_STATE_UNEXPECTED; 1040 } 1041 1042 return MergePortsInternal(port_ref, new_port_ref, 1043 false /* allow_close_on_bad_state */); 1044 } 1045 1046 int Node::OnUserMessageReadAckRequest( 1047 const PortRef& port_ref, 1048 mozilla::UniquePtr<UserMessageReadAckRequestEvent> event) { 1049 DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence " 1050 << event->sequence_num_to_acknowledge(); 1051 1052 if (!port_ref.is_valid()) { 1053 return ERROR_PORT_UNKNOWN; 1054 } 1055 1056 NodeName peer_node_name; 1057 mozilla::UniquePtr<Event> event_to_send; 1058 { 1059 SinglePortLocker locker(&port_ref); 1060 auto* port = locker.port(); 1061 1062 peer_node_name = port->peer_node_name; 1063 if (port->state == Port::kProxying) { 1064 // Proxies simply forward the ack request to their peer. 1065 event->set_port_name(port->peer_port_name); 1066 event->set_from_port(port_ref.name()); 1067 event->set_control_sequence_num( 1068 port->next_control_sequence_num_to_send++); 1069 event_to_send = std::move(event); 1070 } else { 1071 uint64_t current_sequence_num = 1072 port->message_queue.next_sequence_num() - 1; 1073 // Either this is requesting an ack for a sequence number already read, or 1074 // else for a sequence number that is yet to be read. 1075 if (current_sequence_num >= event->sequence_num_to_acknowledge()) { 1076 // If the current sequence number to read already exceeds the ack 1077 // request, send an ack immediately. 1078 event_to_send = mozilla::MakeUnique<UserMessageReadAckEvent>( 1079 port->peer_port_name, port_ref.name(), 1080 port->next_control_sequence_num_to_send++, current_sequence_num); 1081 1082 if (port->state == Port::kBuffering) { 1083 port->control_message_queue.push_back( 1084 {peer_node_name, std::move(event_to_send)}); 1085 } 1086 1087 // This might be a late or duplicate acknowledge request, that's 1088 // requesting acknowledge for an already read message. There may already 1089 // have been a request for future reads, so take care not to back up 1090 // the requested acknowledge counter. 1091 if (current_sequence_num > port->sequence_num_to_acknowledge) { 1092 port->sequence_num_to_acknowledge = current_sequence_num; 1093 } 1094 } else { 1095 // This is request to ack a sequence number that hasn't been read yet. 1096 // The state of the port can either be that it already has a 1097 // future-requested ack, or not. Because ack requests aren't guaranteed 1098 // to arrive in order, store the earlier of the current queued request 1099 // and the new one, if one was already requested. 1100 bool has_queued_ack_request = 1101 port->sequence_num_to_acknowledge > current_sequence_num; 1102 if (!has_queued_ack_request || 1103 port->sequence_num_to_acknowledge > 1104 event->sequence_num_to_acknowledge()) { 1105 port->sequence_num_to_acknowledge = 1106 event->sequence_num_to_acknowledge(); 1107 } 1108 return OK; 1109 } 1110 } 1111 } 1112 1113 if (event_to_send) { 1114 delegate_->ForwardEvent(peer_node_name, std::move(event_to_send)); 1115 } 1116 1117 return OK; 1118 } 1119 1120 int Node::OnUserMessageReadAck( 1121 const PortRef& port_ref, 1122 mozilla::UniquePtr<UserMessageReadAckEvent> event) { 1123 DVLOG(1) << "Acknowledge " << port_ref.name() << "@" << name_ << " sequence " 1124 << event->sequence_num_acknowledged(); 1125 1126 NodeName peer_node_name; 1127 ScopedEvent ack_request_event; 1128 if (port_ref.is_valid()) { 1129 SinglePortLocker locker(&port_ref); 1130 auto* port = locker.port(); 1131 1132 if (event->sequence_num_acknowledged() >= port->next_sequence_num_to_send) { 1133 // TODO(http://crbug.com/980952): This is a malformed event. 1134 // This could return a new error "ERROR_MALFORMED_EVENT" which the 1135 // delegate could use as a signal to drop the peer node. 1136 return OK; 1137 } 1138 1139 // Keep the largest acknowledge seen. 1140 if (event->sequence_num_acknowledged() <= 1141 port->last_sequence_num_acknowledged) { 1142 // The acknowledge was late or a duplicate, it's safe to ignore it. 1143 return OK; 1144 } 1145 1146 port->last_sequence_num_acknowledged = event->sequence_num_acknowledged(); 1147 // Send another ack request if the interval is non-zero and the peer has 1148 // not been closed. 1149 if (port->sequence_num_acknowledge_interval && !port->peer_closed) { 1150 peer_node_name = port->peer_node_name; 1151 ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>( 1152 port->peer_port_name, port_ref.name(), 1153 port->next_control_sequence_num_to_send++, 1154 port->last_sequence_num_acknowledged + 1155 port->sequence_num_acknowledge_interval); 1156 DCHECK_NE(port->state, Port::kBuffering); 1157 } 1158 } 1159 if (ack_request_event) { 1160 delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event)); 1161 } 1162 1163 if (port_ref.is_valid()) { 1164 delegate_->PortStatusChanged(port_ref); 1165 } 1166 1167 return OK; 1168 } 1169 1170 int Node::OnUpdatePreviousPeer( 1171 const PortRef& port_ref, 1172 mozilla::UniquePtr<UpdatePreviousPeerEvent> event) { 1173 DVLOG(1) << "OnUpdatePreviousPeer port: " << event->port_name() 1174 << " changing to " << event->new_node_name() 1175 << ", port: " << event->from_port() << " => " 1176 << event->new_port_name(); 1177 1178 if (!port_ref.is_valid()) { 1179 return ERROR_PORT_UNKNOWN; 1180 } 1181 1182 const NodeName& new_node_name = event->new_node_name(); 1183 const PortName& new_port_name = event->new_port_name(); 1184 DCHECK_NE(new_node_name, kInvalidNodeName); 1185 DCHECK_NE(new_port_name, kInvalidPortName); 1186 if (new_node_name == kInvalidNodeName || new_port_name == kInvalidPortName) { 1187 return ERROR_PORT_STATE_UNEXPECTED; 1188 } 1189 1190 { 1191 SinglePortLocker locker(&port_ref); 1192 auto* port = locker.port(); 1193 1194 port->prev_node_name = new_node_name; 1195 port->prev_port_name = new_port_name; 1196 // The sequence number will get incremented after this event has been 1197 // handled. 1198 port->next_control_sequence_num_to_receive = kInitialSequenceNum - 1; 1199 } 1200 1201 return OK; 1202 } 1203 1204 int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) { 1205 PortLocker::AssertNoPortsLockedOnCurrentThread(); 1206 mozilla::MutexAutoLock lock(ports_lock_); 1207 if (port->peer_port_name != kInvalidPortName) { 1208 DCHECK_NE(kInvalidNodeName, port->peer_node_name); 1209 peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace( 1210 port_name, PortRef(port_name, port)); 1211 } 1212 if (!ports_.emplace(port_name, std::move(port)).second) { 1213 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator. 1214 } 1215 DVLOG(2) << "Created port " << port_name << "@" << name_; 1216 return OK; 1217 } 1218 1219 void Node::ErasePort(const PortName& port_name) { 1220 PortLocker::AssertNoPortsLockedOnCurrentThread(); 1221 RefPtr<Port> port; 1222 { 1223 mozilla::MutexAutoLock lock(ports_lock_); 1224 auto it = ports_.find(port_name); 1225 if (it == ports_.end()) { 1226 return; 1227 } 1228 port = std::move(it->second); 1229 ports_.erase(it); 1230 1231 RemoveFromPeerPortMap(port_name, port.get()); 1232 } 1233 // NOTE: We are careful not to release the port's messages while holding any 1234 // locks, since they may run arbitrary user code upon destruction. 1235 std::vector<mozilla::UniquePtr<UserMessageEvent>> messages; 1236 { 1237 PortRef port_ref(port_name, std::move(port)); 1238 SinglePortLocker locker(&port_ref); 1239 locker.port()->message_queue.TakeAllMessages(&messages); 1240 } 1241 DVLOG(2) << "Deleted port " << port_name << "@" << name_; 1242 } 1243 1244 int Node::SendUserMessageInternal( 1245 const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) { 1246 mozilla::UniquePtr<UserMessageEvent>& m = *message; 1247 1248 m->set_from_port(port_ref.name()); 1249 1250 for (size_t i = 0; i < m->num_ports(); ++i) { 1251 if (m->ports()[i] == port_ref.name()) { 1252 return ERROR_PORT_CANNOT_SEND_SELF; 1253 } 1254 } 1255 1256 NodeName target_node; 1257 int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving, 1258 false /* ignore_closed_peer */, m.get(), 1259 &target_node); 1260 if (rv != OK) { 1261 return rv; 1262 } 1263 1264 // Beyond this point there's no sense in returning anything but OK. Even if 1265 // message forwarding or acceptance fails, there's nothing the embedder can 1266 // do to recover. Assume that failure beyond this point must be treated as a 1267 // transport failure. 1268 1269 DCHECK_NE(kInvalidNodeName, target_node); 1270 if (target_node != name_) { 1271 delegate_->ForwardEvent(target_node, std::move(m)); 1272 return OK; 1273 } 1274 1275 int accept_result = AcceptEvent(name_, std::move(m)); 1276 if (accept_result != OK) { 1277 // See comment above for why we don't return an error in this case. 1278 DVLOG(2) << "AcceptEvent failed: " << accept_result; 1279 } 1280 1281 return OK; 1282 } 1283 1284 int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref, 1285 bool allow_close_on_bad_state) { 1286 const PortRef* port_refs[2] = {&port0_ref, &port1_ref}; 1287 PendingUpdatePreviousPeer pending_update_events[2]; 1288 uint64_t original_sequence_number[2]; 1289 { 1290 // Needed to swap peer map entries below. 1291 PortLocker::AssertNoPortsLockedOnCurrentThread(); 1292 mozilla::ReleasableMutexAutoLock ports_locker(ports_lock_); 1293 1294 mozilla::Maybe<PortLocker> locker(std::in_place, port_refs, size_t(2)); 1295 auto* port0 = locker->GetPort(port0_ref); 1296 auto* port1 = locker->GetPort(port1_ref); 1297 1298 // There are several conditions which must be met before we'll consider 1299 // merging two ports: 1300 // 1301 // - They must both be in the kReceiving state 1302 // - They must not be each other's peer 1303 // - They must have never sent a user message 1304 // 1305 // If any of these criteria are not met, we fail early. 1306 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving || 1307 (port0->peer_node_name == name_ && 1308 port0->peer_port_name == port1_ref.name()) || 1309 (port1->peer_node_name == name_ && 1310 port1->peer_port_name == port0_ref.name()) || 1311 port0->next_sequence_num_to_send != kInitialSequenceNum || 1312 port1->next_sequence_num_to_send != kInitialSequenceNum) { 1313 // On failure, we only close a port if it was at least properly in the 1314 // |kReceiving| state. This avoids getting the system in an inconsistent 1315 // state by e.g. closing a proxy abruptly. 1316 // 1317 // Note that we must release the port locks before closing ports. 1318 const bool close_port0 = 1319 port0->state == Port::kReceiving || allow_close_on_bad_state; 1320 const bool close_port1 = 1321 port1->state == Port::kReceiving || allow_close_on_bad_state; 1322 locker.reset(); 1323 ports_locker.Unlock(); 1324 if (close_port0) { 1325 ClosePort(port0_ref); 1326 } 1327 if (close_port1) { 1328 ClosePort(port1_ref); 1329 } 1330 return ERROR_PORT_STATE_UNEXPECTED; 1331 } 1332 1333 pending_update_events[0] = { 1334 .receiver = port0->peer_node_name, 1335 .port = port0->peer_port_name, 1336 .from_port = port0_ref.name(), 1337 .sequence_num = port0->next_control_sequence_num_to_send++, 1338 .new_prev_node = name_, 1339 .new_prev_port = port1_ref.name()}; 1340 pending_update_events[1] = { 1341 .receiver = port1->peer_node_name, 1342 .port = port1->peer_port_name, 1343 .from_port = port1_ref.name(), 1344 .sequence_num = port1->next_control_sequence_num_to_send++, 1345 .new_prev_node = name_, 1346 .new_prev_port = port0_ref.name()}; 1347 1348 // Swap the ports' peer information and switch them both to proxying mode. 1349 SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1); 1350 port0->state = Port::kProxying; 1351 port1->state = Port::kProxying; 1352 original_sequence_number[0] = port0->next_control_sequence_num_to_send; 1353 original_sequence_number[1] = port1->next_control_sequence_num_to_send; 1354 port0->next_control_sequence_num_to_send = kInitialSequenceNum; 1355 port1->next_control_sequence_num_to_send = kInitialSequenceNum; 1356 if (port0->peer_closed) { 1357 port0->remove_proxy_on_last_message = true; 1358 } 1359 if (port1->peer_closed) { 1360 port1->remove_proxy_on_last_message = true; 1361 } 1362 } 1363 1364 // Flush any queued messages from the new proxies and, if successful, complete 1365 // the merge by initiating proxy removals. 1366 if (ForwardUserMessagesFromProxy(port0_ref) == OK && 1367 ForwardUserMessagesFromProxy(port1_ref) == OK) { 1368 // Send the prev peer updates out after the forwarding the user messages 1369 // succeeded. Otherwise, we won't be able to restore the previous state 1370 // below. 1371 for (const auto& pending_update_event : pending_update_events) { 1372 delegate_->ForwardEvent( 1373 pending_update_event.receiver, 1374 mozilla::MakeUnique<UpdatePreviousPeerEvent>( 1375 pending_update_event.port, pending_update_event.from_port, 1376 pending_update_event.sequence_num, 1377 pending_update_event.new_prev_node, 1378 pending_update_event.new_prev_port)); 1379 } 1380 1381 for (const auto* const port_ref : port_refs) { 1382 bool try_remove_proxy_immediately = false; 1383 ScopedEvent closure_event; 1384 NodeName closure_event_target_node; 1385 { 1386 SinglePortLocker locker(port_ref); 1387 auto* port = locker.port(); 1388 DCHECK_EQ(port->state, Port::kProxying); 1389 try_remove_proxy_immediately = port->remove_proxy_on_last_message; 1390 if (try_remove_proxy_immediately || port->peer_closed) { 1391 // If either end of the port cycle is closed, we propagate an 1392 // ObserveClosure event. 1393 closure_event_target_node = port->peer_node_name; 1394 closure_event = mozilla::MakeUnique<ObserveClosureEvent>( 1395 port->peer_port_name, port_ref->name(), 1396 port->next_control_sequence_num_to_send++, 1397 port->last_sequence_num_to_receive); 1398 } 1399 } 1400 if (try_remove_proxy_immediately) { 1401 TryRemoveProxy(*port_ref); 1402 } else { 1403 InitiateProxyRemoval(*port_ref); 1404 } 1405 1406 if (closure_event) { 1407 delegate_->ForwardEvent(closure_event_target_node, 1408 std::move(closure_event)); 1409 } 1410 } 1411 1412 return OK; 1413 } 1414 1415 // If we failed to forward proxied messages, we keep the system in a 1416 // consistent state by undoing the peer swap and closing the ports. 1417 { 1418 PortLocker::AssertNoPortsLockedOnCurrentThread(); 1419 mozilla::MutexAutoLock ports_locker(ports_lock_); 1420 PortLocker locker(port_refs, 2); 1421 auto* port0 = locker.GetPort(port0_ref); 1422 auto* port1 = locker.GetPort(port1_ref); 1423 SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1); 1424 port0->remove_proxy_on_last_message = false; 1425 port1->remove_proxy_on_last_message = false; 1426 DCHECK_EQ(Port::kProxying, port0->state); 1427 DCHECK_EQ(Port::kProxying, port1->state); 1428 port0->state = Port::kReceiving; 1429 port1->state = Port::kReceiving; 1430 port0->next_control_sequence_num_to_send = original_sequence_number[0]; 1431 port1->next_control_sequence_num_to_send = original_sequence_number[1]; 1432 } 1433 1434 ClosePort(port0_ref); 1435 ClosePort(port1_ref); 1436 return ERROR_PORT_STATE_UNEXPECTED; 1437 } 1438 1439 void Node::ConvertToProxy(Port* port, const NodeName& to_node_name, 1440 PortName* port_name, 1441 Event::PortDescriptor* port_descriptor, 1442 PendingUpdatePreviousPeer* pending_update) { 1443 port->AssertLockAcquired(); 1444 PortName local_port_name = *port_name; 1445 1446 PortName new_port_name; 1447 GenerateRandomPortName(&new_port_name); 1448 1449 pending_update->receiver = port->peer_node_name; 1450 pending_update->port = port->peer_port_name; 1451 pending_update->sequence_num = port->next_control_sequence_num_to_send++; 1452 pending_update->new_prev_node = to_node_name; 1453 pending_update->new_prev_port = new_port_name; 1454 1455 // Make sure we don't send messages to the new peer until after we know it 1456 // exists. In the meantime, just buffer messages locally. 1457 DCHECK_EQ(port->state, Port::kReceiving); 1458 port->state = Port::kBuffering; 1459 1460 // If we already know our peer is closed, we already know this proxy can 1461 // be removed once it receives and forwards its last expected message. 1462 if (port->peer_closed) { 1463 port->remove_proxy_on_last_message = true; 1464 } 1465 1466 *port_name = new_port_name; 1467 1468 port_descriptor->peer_node_name = port->peer_node_name; 1469 port_descriptor->peer_port_name = port->peer_port_name; 1470 port_descriptor->referring_node_name = name_; 1471 port_descriptor->referring_port_name = local_port_name; 1472 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send; 1473 port_descriptor->next_sequence_num_to_receive = 1474 port->message_queue.next_sequence_num(); 1475 port_descriptor->last_sequence_num_to_receive = 1476 port->last_sequence_num_to_receive; 1477 port_descriptor->peer_closed = port->peer_closed; 1478 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding)); 1479 1480 // Configure the local port to point to the new port. 1481 UpdatePortPeerAddress(local_port_name, port, to_node_name, new_port_name); 1482 } 1483 1484 int Node::AcceptPort(const PortName& port_name, 1485 const Event::PortDescriptor& port_descriptor) { 1486 RefPtr<Port> port = 1487 mozilla::MakeRefPtr<Port>(port_descriptor.next_sequence_num_to_send, 1488 port_descriptor.next_sequence_num_to_receive); 1489 port->state = Port::kReceiving; 1490 port->peer_node_name = port_descriptor.peer_node_name; 1491 port->peer_port_name = port_descriptor.peer_port_name; 1492 port->next_control_sequence_num_to_send = kInitialSequenceNum; 1493 port->next_control_sequence_num_to_receive = kInitialSequenceNum; 1494 port->prev_node_name = port_descriptor.referring_node_name; 1495 port->prev_port_name = port_descriptor.referring_port_name; 1496 port->last_sequence_num_to_receive = 1497 port_descriptor.last_sequence_num_to_receive; 1498 port->peer_closed = port_descriptor.peer_closed; 1499 1500 DVLOG(2) << "Accepting port " << port_name 1501 << " [peer_closed=" << port->peer_closed 1502 << "; last_sequence_num_to_receive=" 1503 << port->last_sequence_num_to_receive << "]"; 1504 1505 // A newly accepted port is not signalable until the message referencing the 1506 // new port finds its way to the consumer (see GetMessage). 1507 port->message_queue.set_signalable(false); 1508 1509 int rv = AddPortWithName(port_name, std::move(port)); 1510 if (rv != OK) { 1511 return rv; 1512 } 1513 1514 // Allow referring port to forward messages. 1515 delegate_->ForwardEvent(port_descriptor.referring_node_name, 1516 mozilla::MakeUnique<PortAcceptedEvent>( 1517 port_descriptor.referring_port_name, 1518 kInvalidPortName, kInvalidSequenceNum)); 1519 1520 if (port_descriptor.peer_node_name != name_) { 1521 delegate_->ObserveRemoteNode(port_descriptor.peer_node_name); 1522 } 1523 1524 return OK; 1525 } 1526 1527 int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref, 1528 Port::State expected_port_state, 1529 bool ignore_closed_peer, 1530 UserMessageEvent* message, 1531 NodeName* forward_to_node) { 1532 bool target_is_remote = false; 1533 std::vector<PendingUpdatePreviousPeer> peer_update_events; 1534 1535 for (;;) { 1536 NodeName target_node_name; 1537 { 1538 SinglePortLocker locker(&forwarding_port_ref); 1539 target_node_name = locker.port()->peer_node_name; 1540 } 1541 1542 // NOTE: This may call out to arbitrary user code, so it's important to call 1543 // it only while no port locks are held on the calling thread. 1544 if (target_node_name != name_) { 1545 if (!message->NotifyWillBeRoutedExternally()) { 1546 CHROMIUM_LOG(ERROR) 1547 << "NotifyWillBeRoutedExternally failed unexpectedly."; 1548 return ERROR_PORT_STATE_UNEXPECTED; 1549 } 1550 } 1551 1552 // Must be held because ConvertToProxy needs to update |peer_port_maps_|. 1553 PortLocker::AssertNoPortsLockedOnCurrentThread(); 1554 mozilla::MutexAutoLock ports_locker(ports_lock_); 1555 1556 // Simultaneously lock the forwarding port as well as all attached ports. 1557 AutoTArray<PortRef, 4> attached_port_refs; 1558 AutoTArray<const PortRef*, 5> ports_to_lock; 1559 attached_port_refs.SetCapacity(message->num_ports()); 1560 ports_to_lock.SetCapacity(message->num_ports() + 1); 1561 ports_to_lock.AppendElement(&forwarding_port_ref); 1562 for (size_t i = 0; i < message->num_ports(); ++i) { 1563 const PortName& attached_port_name = message->ports()[i]; 1564 auto iter = ports_.find(attached_port_name); 1565 DCHECK(iter != ports_.end()); 1566 attached_port_refs.AppendElement( 1567 PortRef(attached_port_name, iter->second)); 1568 ports_to_lock.AppendElement(&attached_port_refs[i]); 1569 } 1570 PortLocker locker(ports_to_lock.Elements(), ports_to_lock.Length()); 1571 auto* forwarding_port = locker.GetPort(forwarding_port_ref); 1572 1573 if (forwarding_port->peer_node_name != target_node_name) { 1574 // The target node has already changed since we last held the lock. 1575 if (target_node_name == name_) { 1576 // If the target node was previously this local node, we need to restart 1577 // the loop, since that means we may now route the message externally. 1578 continue; 1579 } 1580 1581 target_node_name = forwarding_port->peer_node_name; 1582 } 1583 target_is_remote = target_node_name != name_; 1584 1585 if (forwarding_port->state != expected_port_state) { 1586 return ERROR_PORT_STATE_UNEXPECTED; 1587 } 1588 if (forwarding_port->peer_closed && !ignore_closed_peer) { 1589 return ERROR_PORT_PEER_CLOSED; 1590 } 1591 1592 // Messages may already have a sequence number if they're being forwarded by 1593 // a proxy. Otherwise, use the next outgoing sequence number. 1594 if (message->sequence_num() == 0) { 1595 message->set_sequence_num(forwarding_port->next_sequence_num_to_send++); 1596 } 1597 #ifdef DEBUG 1598 std::ostringstream ports_buf; 1599 for (size_t i = 0; i < message->num_ports(); ++i) { 1600 if (i > 0) { 1601 ports_buf << ","; 1602 } 1603 ports_buf << message->ports()[i]; 1604 } 1605 #endif 1606 1607 if (message->num_ports() > 0) { 1608 // Sanity check to make sure we can actually send all the attached ports. 1609 // They must all be in the |kReceiving| state and must not be the sender's 1610 // own peer. 1611 DCHECK_EQ(message->num_ports(), attached_port_refs.Length()); 1612 for (size_t i = 0; i < message->num_ports(); ++i) { 1613 auto* attached_port = locker.GetPort(attached_port_refs[i]); 1614 int error = OK; 1615 if (attached_port->state != Port::kReceiving) { 1616 error = ERROR_PORT_STATE_UNEXPECTED; 1617 } else if (attached_port_refs[i].name() == 1618 forwarding_port->peer_port_name) { 1619 error = ERROR_PORT_CANNOT_SEND_PEER; 1620 } 1621 1622 if (error != OK) { 1623 // Not going to send. Backpedal on the sequence number. 1624 forwarding_port->next_sequence_num_to_send--; 1625 return error; 1626 } 1627 } 1628 1629 if (target_is_remote) { 1630 // We only bother to proxy and rewrite ports in the event if it's 1631 // going to be routed to an external node. This substantially reduces 1632 // the amount of port churn in the system, as many port-carrying 1633 // events are routed at least 1 or 2 intra-node hops before (if ever) 1634 // being routed externally. 1635 Event::PortDescriptor* port_descriptors = message->port_descriptors(); 1636 for (size_t i = 0; i < message->num_ports(); ++i) { 1637 auto* port = locker.GetPort(attached_port_refs[i]); 1638 PendingUpdatePreviousPeer update_event = { 1639 .from_port = attached_port_refs[i].name()}; 1640 ConvertToProxy(port, target_node_name, message->ports() + i, 1641 port_descriptors + i, &update_event); 1642 peer_update_events.push_back(update_event); 1643 } 1644 } 1645 } 1646 1647 #ifdef DEBUG 1648 DVLOG(4) << "Sending message " << message->sequence_num() 1649 << " [ports=" << ports_buf.str() << "]" 1650 << " from " << forwarding_port_ref.name() << "@" << name_ << " to " 1651 << forwarding_port->peer_port_name << "@" << target_node_name; 1652 #endif 1653 1654 *forward_to_node = target_node_name; 1655 message->set_port_name(forwarding_port->peer_port_name); 1656 message->set_from_port(forwarding_port_ref.name()); 1657 message->set_control_sequence_num( 1658 forwarding_port->next_control_sequence_num_to_send++); 1659 break; 1660 } 1661 1662 for (auto& pending_update_event : peer_update_events) { 1663 delegate_->ForwardEvent( 1664 pending_update_event.receiver, 1665 mozilla::MakeUnique<UpdatePreviousPeerEvent>( 1666 pending_update_event.port, pending_update_event.from_port, 1667 pending_update_event.sequence_num, 1668 pending_update_event.new_prev_node, 1669 pending_update_event.new_prev_port)); 1670 } 1671 1672 if (target_is_remote) { 1673 for (size_t i = 0; i < message->num_ports(); ++i) { 1674 // For any ports that were converted to proxies above, make sure their 1675 // prior local peer (if applicable) receives a status update so it can be 1676 // made aware of its peer's location. 1677 const Event::PortDescriptor& descriptor = message->port_descriptors()[i]; 1678 if (descriptor.peer_node_name == name_) { 1679 PortRef local_peer; 1680 if (GetPort(descriptor.peer_port_name, &local_peer) == OK) { 1681 delegate_->PortStatusChanged(local_peer); 1682 } 1683 } 1684 } 1685 } 1686 1687 return OK; 1688 } 1689 1690 int Node::BeginProxying(const PortRef& port_ref) { 1691 std::vector<std::pair<NodeName, ScopedEvent>> control_message_queue; 1692 { 1693 SinglePortLocker locker(&port_ref); 1694 auto* port = locker.port(); 1695 if (port->state != Port::kBuffering) { 1696 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1697 } 1698 port->state = Port::kProxying; 1699 std::swap(port->control_message_queue, control_message_queue); 1700 } 1701 1702 for (auto& [control_message_node_name, control_message_event] : 1703 control_message_queue) { 1704 delegate_->ForwardEvent(control_message_node_name, 1705 std::move(control_message_event)); 1706 } 1707 control_message_queue.clear(); 1708 1709 int rv = ForwardUserMessagesFromProxy(port_ref); 1710 if (rv != OK) { 1711 return rv; 1712 } 1713 1714 // Forward any pending acknowledge request. 1715 MaybeForwardAckRequest(port_ref); 1716 1717 bool try_remove_proxy_immediately; 1718 { 1719 SinglePortLocker locker(&port_ref); 1720 auto* port = locker.port(); 1721 if (port->state != Port::kProxying) { 1722 return OOPS(ERROR_PORT_STATE_UNEXPECTED); 1723 } 1724 1725 try_remove_proxy_immediately = port->remove_proxy_on_last_message; 1726 } 1727 1728 if (try_remove_proxy_immediately) { 1729 TryRemoveProxy(port_ref); 1730 } else { 1731 InitiateProxyRemoval(port_ref); 1732 } 1733 1734 return OK; 1735 } 1736 1737 int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) { 1738 for (;;) { 1739 // NOTE: We forward messages in sequential order here so that we maintain 1740 // the message queue's notion of next sequence number. That's useful for the 1741 // proxy removal process as we can tell when this port has seen all of the 1742 // messages it is expected to see. 1743 mozilla::UniquePtr<UserMessageEvent> message; 1744 { 1745 SinglePortLocker locker(&port_ref); 1746 locker.port()->message_queue.GetNextMessage(&message, nullptr); 1747 if (!message) { 1748 break; 1749 } 1750 } 1751 1752 NodeName target_node; 1753 int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying, 1754 true /* ignore_closed_peer */, 1755 message.get(), &target_node); 1756 { 1757 // Mark the message as processed after we ran PrepareToForwardUserMessage. 1758 // This is important to prevent another thread from deleting the port 1759 // before we grabbed a sequence number for the message. 1760 SinglePortLocker locker(&port_ref); 1761 locker.port()->message_queue.MessageProcessed(); 1762 } 1763 if (rv != OK) { 1764 return rv; 1765 } 1766 1767 delegate_->ForwardEvent(target_node, std::move(message)); 1768 } 1769 return OK; 1770 } 1771 1772 void Node::InitiateProxyRemoval(const PortRef& port_ref) { 1773 NodeName peer_node_name; 1774 PortName peer_port_name; 1775 uint64_t sequence_num; 1776 { 1777 SinglePortLocker locker(&port_ref); 1778 auto* port = locker.port(); 1779 if (port->state == Port::kClosed) { 1780 return; 1781 } 1782 peer_node_name = port->peer_node_name; 1783 peer_port_name = port->peer_port_name; 1784 sequence_num = port->next_control_sequence_num_to_send++; 1785 DCHECK_EQ(port->state, Port::kProxying); 1786 } 1787 1788 // To remove this node, we start by notifying the connected graph that we are 1789 // a proxy. This allows whatever port is referencing this node to skip it. 1790 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if 1791 // the peer was closed in the meantime). 1792 delegate_->ForwardEvent( 1793 peer_node_name, mozilla::MakeUnique<ObserveProxyEvent>( 1794 peer_port_name, port_ref.name(), sequence_num, name_, 1795 port_ref.name(), peer_node_name, peer_port_name)); 1796 } 1797 1798 void Node::TryRemoveProxy(const PortRef& port_ref) { 1799 bool should_erase = false; 1800 NodeName removal_target_node; 1801 ScopedEvent removal_event; 1802 PendingUpdatePreviousPeer pending_update_event; 1803 1804 { 1805 SinglePortLocker locker(&port_ref); 1806 auto* port = locker.port(); 1807 if (port->state == Port::kClosed) { 1808 return; 1809 } 1810 DCHECK_EQ(port->state, Port::kProxying); 1811 1812 // Make sure we have seen ObserveProxyAck before removing the port. 1813 if (!port->remove_proxy_on_last_message) { 1814 return; 1815 } 1816 1817 if (!CanAcceptMoreMessages(port)) { 1818 DCHECK_EQ(port->message_queue.queued_message_count(), 0lu); 1819 should_erase = true; 1820 if (port->send_on_proxy_removal) { 1821 removal_target_node = port->send_on_proxy_removal->first; 1822 removal_event = std::move(port->send_on_proxy_removal->second); 1823 if (removal_event) { 1824 removal_event->set_control_sequence_num( 1825 port->next_control_sequence_num_to_send++); 1826 DCHECK_EQ(removal_target_node, port->peer_node_name); 1827 DCHECK_EQ(removal_event->port_name(), port->peer_port_name); 1828 } 1829 } 1830 // Tell the peer_node to accept messages from prev_node from now. 1831 pending_update_event = { 1832 .receiver = port->peer_node_name, 1833 .port = port->peer_port_name, 1834 .from_port = port_ref.name(), 1835 .sequence_num = port->next_control_sequence_num_to_send++, 1836 .new_prev_node = port->prev_node_name, 1837 .new_prev_port = port->prev_port_name}; 1838 } else { 1839 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_ 1840 << " now; waiting for more messages"; 1841 } 1842 } 1843 1844 if (should_erase) { 1845 delegate_->ForwardEvent( 1846 pending_update_event.receiver, 1847 mozilla::MakeUnique<UpdatePreviousPeerEvent>( 1848 pending_update_event.port, pending_update_event.from_port, 1849 pending_update_event.sequence_num, 1850 pending_update_event.new_prev_node, 1851 pending_update_event.new_prev_port)); 1852 ErasePort(port_ref.name()); 1853 } 1854 1855 if (removal_event) { 1856 delegate_->ForwardEvent(removal_target_node, std::move(removal_event)); 1857 } 1858 } 1859 1860 void Node::DestroyAllPortsWithPeer(const NodeName& node_name, 1861 const PortName& port_name) { 1862 // Wipes out all ports whose peer node matches |node_name| and whose peer port 1863 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer 1864 // node is matched. 1865 1866 std::vector<PortRef> ports_to_notify; 1867 std::vector<PortName> dead_proxies_to_broadcast; 1868 std::vector<mozilla::UniquePtr<UserMessageEvent>> undelivered_messages; 1869 std::vector<std::pair<NodeName, ScopedEvent>> closure_events; 1870 1871 { 1872 PortLocker::AssertNoPortsLockedOnCurrentThread(); 1873 mozilla::MutexAutoLock ports_lock(ports_lock_); 1874 1875 auto node_peer_port_map_iter = peer_port_maps_.find(node_name); 1876 if (node_peer_port_map_iter == peer_port_maps_.end()) { 1877 return; 1878 } 1879 1880 auto& node_peer_port_map = node_peer_port_map_iter->second; 1881 auto peer_ports_begin = node_peer_port_map.begin(); 1882 auto peer_ports_end = node_peer_port_map.end(); 1883 if (port_name != kInvalidPortName) { 1884 // If |port_name| is given, we limit the set of local ports to the ones 1885 // with that specific port as their peer. 1886 peer_ports_begin = node_peer_port_map.find(port_name); 1887 if (peer_ports_begin == node_peer_port_map.end()) { 1888 return; 1889 } 1890 1891 peer_ports_end = peer_ports_begin; 1892 ++peer_ports_end; 1893 } 1894 1895 for (auto peer_port_iter = peer_ports_begin; 1896 peer_port_iter != peer_ports_end; ++peer_port_iter) { 1897 auto& local_ports = peer_port_iter->second; 1898 // NOTE: This inner loop almost always has only one element. There are 1899 // relatively short-lived cases where more than one local port points to 1900 // the same peer, and this only happens when extra ports are bypassed 1901 // proxies waiting to be torn down. 1902 for (auto& local_port : local_ports) { 1903 auto& local_port_ref = local_port.second; 1904 1905 SinglePortLocker locker(&local_port_ref); 1906 auto* port = locker.port(); 1907 1908 if (port_name != kInvalidPortName) { 1909 // If this is a targeted observe dead proxy event, send out an 1910 // ObserveClosure to acknowledge it. 1911 closure_events.push_back( 1912 std::pair{port->peer_node_name, 1913 mozilla::MakeUnique<ObserveClosureEvent>( 1914 port->peer_port_name, local_port_ref.name(), 1915 port->next_control_sequence_num_to_send++, 1916 port->last_sequence_num_to_receive)}); 1917 } 1918 1919 if (!port->peer_closed) { 1920 // Treat this as immediate peer closure. It's an exceptional 1921 // condition akin to a broken pipe, so we don't care about losing 1922 // messages. 1923 1924 port->peer_closed = true; 1925 port->peer_lost_unexpectedly = true; 1926 if (port->state == Port::kReceiving) { 1927 ports_to_notify.push_back(local_port_ref); 1928 } 1929 } 1930 1931 // We don't expect to forward any further messages, and we don't 1932 // expect to receive a Port{Accepted,Rejected} event. Because we're 1933 // a proxy with no active peer, we cannot use the normal proxy removal 1934 // procedure of forward-propagating an ObserveProxy. Instead we 1935 // broadcast our own death so it can be back-propagated. This is 1936 // inefficient but rare. 1937 if (port->state == Port::kBuffering || port->state == Port::kProxying) { 1938 port->state = Port::kClosed; 1939 dead_proxies_to_broadcast.push_back(local_port_ref.name()); 1940 std::vector<mozilla::UniquePtr<UserMessageEvent>> messages; 1941 port->message_queue.TakeAllMessages(&messages); 1942 port->TakePendingMessages(messages); 1943 for (auto& message : messages) { 1944 undelivered_messages.emplace_back(std::move(message)); 1945 } 1946 } 1947 } 1948 } 1949 } 1950 1951 for (auto& [closure_event_target_node, closure_event] : closure_events) { 1952 delegate_->ForwardEvent(closure_event_target_node, 1953 std::move(closure_event)); 1954 } 1955 1956 // Wake up any receiving ports who have just observed simulated peer closure. 1957 for (const auto& port : ports_to_notify) { 1958 delegate_->PortStatusChanged(port); 1959 } 1960 1961 for (const auto& proxy_name : dead_proxies_to_broadcast) { 1962 // Broadcast an event signifying that this proxy is no longer functioning. 1963 delegate_->BroadcastEvent(mozilla::MakeUnique<ObserveProxyEvent>( 1964 kInvalidPortName, kInvalidPortName, kInvalidSequenceNum, name_, 1965 proxy_name, kInvalidNodeName, kInvalidPortName)); 1966 1967 // Also process death locally since the port that points this closed one 1968 // could be on the current node. 1969 // Note: Although this is recursive, only a single port is involved which 1970 // limits the expected branching to 1. 1971 DestroyAllPortsWithPeer(name_, proxy_name); 1972 } 1973 1974 // Close any ports referenced by undelivered messages. 1975 for (const auto& message : undelivered_messages) { 1976 for (size_t i = 0; i < message->num_ports(); ++i) { 1977 PortRef ref; 1978 if (GetPort(message->ports()[i], &ref) == OK) { 1979 ClosePort(ref); 1980 } 1981 } 1982 } 1983 } 1984 1985 void Node::UpdatePortPeerAddress(const PortName& local_port_name, 1986 Port* local_port, 1987 const NodeName& new_peer_node, 1988 const PortName& new_peer_port) { 1989 ports_lock_.AssertCurrentThreadOwns(); 1990 local_port->AssertLockAcquired(); 1991 1992 RemoveFromPeerPortMap(local_port_name, local_port); 1993 local_port->peer_node_name = new_peer_node; 1994 local_port->peer_port_name = new_peer_port; 1995 local_port->next_control_sequence_num_to_send = kInitialSequenceNum; 1996 if (new_peer_port != kInvalidPortName) { 1997 peer_port_maps_[new_peer_node][new_peer_port].emplace( 1998 local_port_name, PortRef(local_port_name, RefPtr<Port>{local_port})); 1999 } 2000 } 2001 2002 void Node::RemoveFromPeerPortMap(const PortName& local_port_name, 2003 Port* local_port) { 2004 if (local_port->peer_port_name == kInvalidPortName) { 2005 return; 2006 } 2007 2008 auto node_iter = peer_port_maps_.find(local_port->peer_node_name); 2009 if (node_iter == peer_port_maps_.end()) { 2010 return; 2011 } 2012 2013 auto& node_peer_port_map = node_iter->second; 2014 auto ports_iter = node_peer_port_map.find(local_port->peer_port_name); 2015 if (ports_iter == node_peer_port_map.end()) { 2016 return; 2017 } 2018 2019 auto& local_ports_with_this_peer = ports_iter->second; 2020 local_ports_with_this_peer.erase(local_port_name); 2021 if (local_ports_with_this_peer.empty()) { 2022 node_peer_port_map.erase(ports_iter); 2023 } 2024 if (node_peer_port_map.empty()) { 2025 peer_port_maps_.erase(node_iter); 2026 } 2027 } 2028 2029 void Node::SwapPortPeers(const PortName& port0_name, Port* port0, 2030 const PortName& port1_name, Port* port1) { 2031 ports_lock_.AssertCurrentThreadOwns(); 2032 port0->AssertLockAcquired(); 2033 port1->AssertLockAcquired(); 2034 2035 auto& peer0_ports = 2036 peer_port_maps_[port0->peer_node_name][port0->peer_port_name]; 2037 auto& peer1_ports = 2038 peer_port_maps_[port1->peer_node_name][port1->peer_port_name]; 2039 peer0_ports.erase(port0_name); 2040 peer1_ports.erase(port1_name); 2041 peer0_ports.emplace(port1_name, PortRef(port1_name, RefPtr<Port>{port1})); 2042 peer1_ports.emplace(port0_name, PortRef(port0_name, RefPtr<Port>{port0})); 2043 2044 std::swap(port0->peer_node_name, port1->peer_node_name); 2045 std::swap(port0->peer_port_name, port1->peer_port_name); 2046 } 2047 2048 void Node::MaybeResendAckRequest(const PortRef& port_ref) { 2049 NodeName peer_node_name; 2050 ScopedEvent ack_request_event; 2051 { 2052 SinglePortLocker locker(&port_ref); 2053 auto* port = locker.port(); 2054 if (port->state != Port::kReceiving) { 2055 return; 2056 } 2057 2058 if (!port->sequence_num_acknowledge_interval) { 2059 return; 2060 } 2061 2062 peer_node_name = port->peer_node_name; 2063 ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>( 2064 port->peer_port_name, port_ref.name(), 2065 port->next_control_sequence_num_to_send++, 2066 port->last_sequence_num_acknowledged + 2067 port->sequence_num_acknowledge_interval); 2068 } 2069 2070 delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event)); 2071 } 2072 2073 void Node::MaybeForwardAckRequest(const PortRef& port_ref) { 2074 NodeName peer_node_name; 2075 ScopedEvent ack_request_event; 2076 { 2077 SinglePortLocker locker(&port_ref); 2078 auto* port = locker.port(); 2079 if (port->state != Port::kProxying) { 2080 return; 2081 } 2082 2083 if (!port->sequence_num_to_acknowledge) { 2084 return; 2085 } 2086 2087 peer_node_name = port->peer_node_name; 2088 ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>( 2089 port->peer_port_name, port_ref.name(), 2090 port->next_control_sequence_num_to_send++, 2091 port->sequence_num_to_acknowledge); 2092 2093 port->sequence_num_to_acknowledge = 0; 2094 } 2095 2096 delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event)); 2097 } 2098 2099 void Node::MaybeResendAck(const PortRef& port_ref) { 2100 NodeName peer_node_name; 2101 ScopedEvent ack_event; 2102 { 2103 SinglePortLocker locker(&port_ref); 2104 auto* port = locker.port(); 2105 if (port->state != Port::kReceiving) { 2106 return; 2107 } 2108 2109 uint64_t last_sequence_num_read = 2110 port->message_queue.next_sequence_num() - 1; 2111 if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) { 2112 return; 2113 } 2114 2115 peer_node_name = port->peer_node_name; 2116 ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>( 2117 port->peer_port_name, port_ref.name(), 2118 port->next_control_sequence_num_to_send++, last_sequence_num_read); 2119 } 2120 2121 delegate_->ForwardEvent(peer_node_name, std::move(ack_event)); 2122 } 2123 2124 Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate) 2125 : node_(node), delegate_(delegate) { 2126 DCHECK(node_); 2127 } 2128 2129 Node::DelegateHolder::~DelegateHolder() = default; 2130 2131 #ifdef DEBUG 2132 void Node::DelegateHolder::EnsureSafeDelegateAccess() const { 2133 PortLocker::AssertNoPortsLockedOnCurrentThread(); 2134 mozilla::MutexAutoLock lock(node_->ports_lock_); 2135 } 2136 #endif 2137 2138 } // namespace ports 2139 } // namespace core 2140 } // namespace mojo