NodeController.cpp (32434B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "mozilla/ipc/NodeController.h" 8 #include "MainThreadUtils.h" 9 #include "base/process_util.h" 10 #include "chrome/common/ipc_message.h" 11 #include "mojo/core/ports/name.h" 12 #include "mojo/core/ports/node.h" 13 #include "mojo/core/ports/port_locker.h" 14 #include "mozilla/AlreadyAddRefed.h" 15 #include "mozilla/RandomNum.h" 16 #include "mozilla/StaticPtr.h" 17 #include "mozilla/Assertions.h" 18 #include "mozilla/ToString.h" 19 #include "mozilla/ipc/IOThread.h" 20 #include "mozilla/ipc/ProtocolUtils.h" 21 #include "mozilla/mozalloc.h" 22 #include "nsISerialEventTarget.h" 23 #include "nsTArray.h" 24 #include "nsXULAppAPI.h" 25 #include "nsPrintfCString.h" 26 27 #define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr)) 28 29 namespace mozilla::ipc { 30 31 static StaticRefPtr<NodeController> gNodeController; 32 33 static LazyLogModule gNodeControllerLog{"NodeController"}; 34 35 // Helper logger macro which includes the name of the `this` NodeController in 36 // the logged messages. 37 #define NODECONTROLLER_LOG(level_, fmt_, ...) \ 38 MOZ_LOG(gNodeControllerLog, level_, \ 39 ("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__)) 40 41 // Helper warning macro which both does logger logging and emits NS_WARNING logs 42 // under debug mode. 43 #ifdef DEBUG 44 # define NODECONTROLLER_WARNING(fmt_, ...) \ 45 do { \ 46 nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \ 47 ##__VA_ARGS__); \ 48 NS_WARNING(warning.get()); \ 49 MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \ 50 } while (0) 51 #else 52 # define NODECONTROLLER_WARNING(fmt_, ...) \ 53 NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__) 54 #endif 55 56 NodeController::NodeController(const NodeName& aName, 57 const IPC::Channel::ChannelKind* aChannelKind) 58 : mName(aName), 59 mNode(MakeUnique<Node>(aName, this)), 60 mChannelKind(aChannelKind) {} 61 62 NodeController::~NodeController() { 63 auto state = mState.Lock(); 64 MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(), 65 "Destroying NodeController before closing all peers"); 66 MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(), 67 "Destroying NodeController before closing all invites"); 68 }; 69 70 // FIXME: Actually provide some way to create the thing. 71 /* static */ NodeController* NodeController::GetSingleton() { 72 MOZ_ASSERT(gNodeController); 73 return gNodeController; 74 } 75 76 std::pair<ScopedPort, ScopedPort> NodeController::CreatePortPair() { 77 PortRef port0, port1; 78 PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1)); 79 return {ScopedPort{std::move(port0), this}, 80 ScopedPort{std::move(port1), this}}; 81 } 82 83 auto NodeController::GetPort(const PortName& aName) -> PortRef { 84 PortRef port; 85 int rv = mNode->GetPort(aName, &port); 86 if (NS_WARN_IF(rv != mojo::core::ports::OK)) { 87 NODECONTROLLER_WARNING("Call to GetPort(%s) Failed", 88 ToString(aName).c_str()); 89 return {}; 90 } 91 return port; 92 } 93 94 void NodeController::SetPortObserver(const PortRef& aPort, 95 PortObserver* aObserver) { 96 PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver)); 97 } 98 99 auto NodeController::GetStatus(const PortRef& aPort) -> Maybe<PortStatus> { 100 PortStatus status{}; 101 int rv = mNode->GetStatus(aPort, &status); 102 if (rv != mojo::core::ports::OK) { 103 return Nothing(); 104 } 105 return Some(status); 106 } 107 108 void NodeController::ClosePort(const PortRef& aPort) { 109 PORTS_ALWAYS_OK(mNode->ClosePort(aPort)); 110 } 111 112 bool NodeController::GetMessage(const PortRef& aPort, 113 UniquePtr<IPC::Message>* aMessage) { 114 UniquePtr<UserMessageEvent> messageEvent; 115 int rv = mNode->GetMessage(aPort, &messageEvent, nullptr); 116 if (rv != mojo::core::ports::OK) { 117 if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) { 118 return false; 119 } 120 MOZ_CRASH("GetMessage on port in invalid state"); 121 } 122 123 if (messageEvent) { 124 UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>(); 125 126 // If our UserMessageEvent has any ports directly attached to it, fetch them 127 // from our node and attach them to the IPC::Message we extracted. 128 // 129 // It's important to only do this if we have nonempty set of ports on the 130 // event, as we may have never serialized our IPC::Message's ports onto the 131 // event if it was routed in-process. 132 if (messageEvent->num_ports() > 0) { 133 nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports()); 134 for (size_t i = 0; i < messageEvent->num_ports(); ++i) { 135 attachedPorts.AppendElement( 136 ScopedPort{GetPort(messageEvent->ports()[i]), this}); 137 } 138 message->SetAttachedPorts(std::move(attachedPorts)); 139 } 140 141 *aMessage = std::move(message); 142 } else { 143 *aMessage = nullptr; 144 } 145 return true; 146 } 147 148 bool NodeController::SendUserMessage(const PortRef& aPort, 149 UniquePtr<IPC::Message> aMessage) { 150 auto messageEvent = MakeUnique<UserMessageEvent>(0); 151 messageEvent->AttachMessage(std::move(aMessage)); 152 153 int rv = mNode->SendUserMessage(aPort, std::move(messageEvent)); 154 if (rv == mojo::core::ports::OK) { 155 return true; 156 } 157 if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) { 158 NODECONTROLLER_LOG(LogLevel::Debug, 159 "Ignoring message to port %s as peer was closed", 160 ToString(aPort.name()).c_str()); 161 return true; 162 } 163 NODECONTROLLER_WARNING("Failed to send message to port %s", 164 ToString(aPort.name()).c_str()); 165 return false; 166 } 167 168 auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent, 169 const NodeName* aRelayTarget, 170 uint32_t aType) 171 -> UniquePtr<IPC::Message> { 172 UniquePtr<IPC::Message> message; 173 if (aEvent->type() == Event::kUserMessage) { 174 MOZ_DIAGNOSTIC_ASSERT( 175 aType == EVENT_MESSAGE_TYPE, 176 "Can only send a UserMessage in an EVENT_MESSAGE_TYPE"); 177 message = static_cast<UserMessageEvent*>(aEvent.get()) 178 ->TakeMessage<IPC::Message>(); 179 } else { 180 message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType); 181 } 182 183 message->set_relay(aRelayTarget != nullptr); 184 185 size_t length = aEvent->GetSerializedSize(); 186 if (aRelayTarget) { 187 length += sizeof(NodeName); 188 } 189 190 // Use an intermediate buffer to serialize to avoid potential issues with the 191 // segmented `IPC::Message` bufferlist. This should be fairly cheap, as the 192 // majority of events are fairly small. 193 Vector<char, 256, InfallibleAllocPolicy> buffer; 194 (void)buffer.initLengthUninitialized(length); 195 if (aRelayTarget) { 196 memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName)); 197 aEvent->Serialize(buffer.begin() + sizeof(NodeName)); 198 } else { 199 aEvent->Serialize(buffer.begin()); 200 } 201 202 message->WriteFooter(buffer.begin(), buffer.length()); 203 message->set_event_footer_size(buffer.length()); 204 205 #ifdef DEBUG 206 // Debug-assert that we can read the same data back out of the buffer. 207 MOZ_ASSERT(message->event_footer_size() == length); 208 Vector<char, 256, InfallibleAllocPolicy> buffer2; 209 (void)buffer2.initLengthUninitialized(message->event_footer_size()); 210 MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(), 211 /* truncate */ false)); 212 MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length())); 213 #endif 214 215 return message; 216 } 217 218 auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage, 219 NodeName* aRelayTarget) 220 -> UniquePtr<Event> { 221 if (aMessage->is_relay() && !aRelayTarget) { 222 NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name()); 223 return nullptr; 224 } 225 226 Vector<char, 256, InfallibleAllocPolicy> buffer; 227 (void)buffer.initLengthUninitialized(aMessage->event_footer_size()); 228 // Truncate the message when reading the footer, so that the extra footer data 229 // is no longer present in the message. This allows future code to eventually 230 // send the same `IPC::Message` to another process. 231 if (!aMessage->ReadFooter(buffer.begin(), buffer.length(), 232 /* truncate */ true)) { 233 NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed", 234 aMessage->name()); 235 return nullptr; 236 } 237 aMessage->set_event_footer_size(0); 238 239 UniquePtr<Event> event; 240 if (aRelayTarget) { 241 MOZ_ASSERT(aMessage->is_relay()); 242 if (buffer.length() < sizeof(NodeName)) { 243 NODECONTROLLER_WARNING( 244 "Insufficient space in message footer for message '%s'", 245 aMessage->name()); 246 return nullptr; 247 } 248 memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName)); 249 event = Event::Deserialize(buffer.begin() + sizeof(NodeName), 250 buffer.length() - sizeof(NodeName)); 251 } else { 252 event = Event::Deserialize(buffer.begin(), buffer.length()); 253 } 254 255 if (!event) { 256 NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed", 257 aMessage->name()); 258 return nullptr; 259 } 260 261 if (event->type() == Event::kUserMessage) { 262 static_cast<UserMessageEvent*>(event.get()) 263 ->AttachMessage(std::move(aMessage)); 264 } 265 return event; 266 } 267 268 already_AddRefed<NodeChannel> NodeController::GetNodeChannel( 269 const NodeName& aName) { 270 auto state = mState.Lock(); 271 return do_AddRef(state->mPeers.Get(aName)); 272 } 273 274 void NodeController::DropPeer(NodeName aNodeName) { 275 AssertIOThread(); 276 277 #ifdef FUZZING_SNAPSHOT 278 MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer"); 279 #endif 280 281 Invite invite; 282 RefPtr<NodeChannel> channel; 283 nsTArray<PortRef> pendingMerges; 284 { 285 auto state = mState.Lock(); 286 state->mPeers.Remove(aNodeName, &channel); 287 state->mPendingMessages.Remove(aNodeName); 288 state->mInvites.Remove(aNodeName, &invite); 289 state->mPendingMerges.Remove(aNodeName, &pendingMerges); 290 } 291 292 NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")", 293 ToString(aNodeName).c_str(), 294 channel ? channel->OtherPid() : base::kInvalidProcessId); 295 296 if (channel) { 297 channel->Close(); 298 } 299 if (invite.mChannel) { 300 invite.mChannel->Close(); 301 } 302 if (invite.mToMerge.is_valid()) { 303 // Ignore any possible errors here. 304 (void)mNode->ClosePort(invite.mToMerge); 305 } 306 for (auto& port : pendingMerges) { 307 // Ignore any possible errors here. 308 (void)mNode->ClosePort(port); 309 } 310 mNode->LostConnectionToNode(aNodeName); 311 } 312 313 void NodeController::ContactRemotePeer(const NodeName& aNode, 314 UniquePtr<Event> aEvent) { 315 // On Windows and macOS, messages holding HANDLEs or mach ports must be 316 // relayed via the broker process so it can transfer ownership. 317 bool needsRelay = false; 318 #if defined(XP_WIN) || defined(XP_DARWIN) 319 if (aEvent && !IsBroker() && aNode != kBrokerNodeName && 320 aEvent->type() == Event::kUserMessage) { 321 auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get()); 322 needsRelay = userEvent->HasMessage() && 323 mChannelKind->num_relayed_attachments( 324 *userEvent->GetMessage<IPC::Message>()) > 0; 325 } 326 #endif 327 328 UniquePtr<IPC::Message> message; 329 if (aEvent) { 330 message = 331 SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr); 332 MOZ_ASSERT(message->is_relay() == needsRelay, 333 "Message relay status set incorrectly"); 334 } 335 336 RefPtr<NodeChannel> peer; 337 RefPtr<NodeChannel> broker; 338 bool needsIntroduction = false; 339 bool needsBroker = needsRelay; 340 { 341 auto state = mState.Lock(); 342 343 // Check if we know this peer. If we don't, we'll need to request an 344 // introduction. 345 peer = state->mPeers.Get(aNode); 346 if (!peer) { 347 // We don't know the peer, check if we've already requested an 348 // introduction, or if we need to request a new one. 349 auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() { 350 needsIntroduction = true; 351 needsBroker = true; 352 return Queue<UniquePtr<IPC::Message>, 64>{}; 353 }); 354 // If we aren't relaying, queue up the message to be sent. 355 if (message && !needsRelay) { 356 queue.Push(std::move(message)); 357 } 358 } 359 360 if (needsBroker && !IsBroker()) { 361 broker = state->mPeers.Get(kBrokerNodeName); 362 } 363 } 364 365 if (needsBroker && !broker) { 366 NODECONTROLLER_WARNING( 367 "Dropping message '%s'; no connection to unknown peer %s", 368 message ? message->name() : "<null>", ToString(aNode).c_str()); 369 if (needsIntroduction) { 370 // We have no broker and will never be able to be introduced to this node. 371 // Queue a task to clean up any ports connected to it. 372 XRE_GetAsyncIOEventTarget()->Dispatch(NewRunnableMethod<NodeName>( 373 "NodeController::DropPeer", this, &NodeController::DropPeer, aNode)); 374 } 375 return; 376 } 377 378 if (needsIntroduction) { 379 NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s", 380 ToString(aNode).c_str()); 381 broker->RequestIntroduction(aNode); 382 } 383 384 if (message) { 385 if (needsRelay) { 386 NODECONTROLLER_LOG(LogLevel::Info, 387 "Relaying message '%s' for peer %s due to %" PRIu32 388 " attachments", 389 message->name(), ToString(aNode).c_str(), 390 mChannelKind->num_relayed_attachments(*message)); 391 MOZ_ASSERT(mChannelKind->num_relayed_attachments(*message) > 0 && broker); 392 broker->SendEventMessage(std::move(message)); 393 } else if (peer) { 394 peer->SendEventMessage(std::move(message)); 395 } 396 } 397 } 398 399 void NodeController::ForwardEvent(const NodeName& aNode, 400 UniquePtr<Event> aEvent) { 401 MOZ_ASSERT(aEvent, "cannot forward null event"); 402 if (aNode == mName) { 403 (void)mNode->AcceptEvent(mName, std::move(aEvent)); 404 } else { 405 ContactRemotePeer(aNode, std::move(aEvent)); 406 } 407 } 408 409 void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) { 410 UniquePtr<IPC::Message> message = 411 SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE); 412 413 if (IsBroker()) { 414 OnBroadcast(mName, std::move(message)); 415 } else if (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) { 416 broker->Broadcast(std::move(message)); 417 } else { 418 NODECONTROLLER_WARNING( 419 "Trying to broadcast event, but no connection to broker"); 420 } 421 } 422 423 void NodeController::PortStatusChanged(const PortRef& aPortRef) { 424 RefPtr<UserData> userData; 425 int rv = mNode->GetUserData(aPortRef, &userData); 426 if (rv != mojo::core::ports::OK) { 427 NODECONTROLLER_WARNING("GetUserData call for port '%s' failed", 428 ToString(aPortRef.name()).c_str()); 429 return; 430 } 431 if (userData) { 432 // All instances of `UserData` attached to ports in this node must be of 433 // type `PortObserver`, so we can call `OnPortStatusChanged` directly on 434 // them. 435 static_cast<PortObserver*>(userData.get())->OnPortStatusChanged(); 436 } 437 } 438 439 void NodeController::ObserveRemoteNode(const NodeName& aNode) { 440 MOZ_ASSERT(aNode != mName); 441 ContactRemotePeer(aNode, nullptr); 442 } 443 444 void NodeController::OnEventMessage(const NodeName& aFromNode, 445 UniquePtr<IPC::Message> aMessage) { 446 AssertIOThread(); 447 448 bool isRelay = aMessage->is_relay(); 449 if (isRelay && mChannelKind->num_relayed_attachments(*aMessage) == 0) { 450 NODECONTROLLER_WARNING( 451 "Invalid relay message without relayed attachments from peer %s!", 452 ToString(aFromNode).c_str()); 453 DropPeer(aFromNode); 454 return; 455 } 456 457 NodeName relayTarget; 458 UniquePtr<Event> event = DeserializeEventMessage( 459 std::move(aMessage), isRelay ? &relayTarget : nullptr); 460 if (!event) { 461 NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!", 462 ToString(aFromNode).c_str()); 463 DropPeer(aFromNode); 464 return; 465 } 466 467 NodeName fromNode = aFromNode; 468 #if defined(XP_WIN) || defined(XP_DARWIN) 469 if (isRelay) { 470 if (event->type() != Event::kUserMessage) { 471 NODECONTROLLER_WARNING( 472 "Unexpected relay of non-UserMessage event from peer %s!", 473 ToString(aFromNode).c_str()); 474 DropPeer(aFromNode); 475 return; 476 } 477 478 // If we're the broker, then we'll need to forward this message on to the 479 // true recipient. To do this, we re-serialize the message, passing along 480 // the original source node, and send it to the final node. 481 if (IsBroker()) { 482 UniquePtr<IPC::Message> message = 483 SerializeEventMessage(std::move(event), &aFromNode); 484 if (!message) { 485 NODECONTROLLER_WARNING( 486 "Relaying EventMessage from peer %s failed to re-serialize!", 487 ToString(aFromNode).c_str()); 488 DropPeer(aFromNode); 489 return; 490 } 491 MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?"); 492 MOZ_ASSERT(mChannelKind->num_relayed_attachments(*message) > 0, 493 "Message doesn't have relayed attachments?"); 494 495 NODECONTROLLER_LOG( 496 LogLevel::Info, 497 "Relaying message '%s' from peer %s to peer %s (%" PRIu32 498 " attachments)", 499 message->name(), ToString(aFromNode).c_str(), 500 ToString(relayTarget).c_str(), 501 mChannelKind->num_relayed_attachments(*message)); 502 503 RefPtr<NodeChannel> peer; 504 { 505 auto state = mState.Lock(); 506 peer = state->mPeers.Get(relayTarget); 507 } 508 if (!peer) { 509 NODECONTROLLER_WARNING( 510 "Dropping relayed message from %s to unknown peer %s", 511 ToString(aFromNode).c_str(), ToString(relayTarget).c_str()); 512 return; 513 } 514 515 peer->SendEventMessage(std::move(message)); 516 return; 517 } 518 519 // Otherwise, we're the final recipient, so we can continue & process the 520 // message as usual. 521 if (aFromNode != kBrokerNodeName) { 522 NODECONTROLLER_WARNING( 523 "Unexpected relayed EventMessage from non-broker peer %s!", 524 ToString(aFromNode).c_str()); 525 DropPeer(aFromNode); 526 return; 527 } 528 fromNode = relayTarget; 529 530 NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s", 531 ToString(fromNode).c_str()); 532 } 533 #endif 534 535 // If we're getting a requested port merge from another process, check to make 536 // sure that we're expecting the request, and record that the merge has 537 // arrived so we don't try to close the port on error. 538 if (event->type() == Event::kMergePort) { 539 // Check that the target port for the merge actually exists. 540 auto targetPort = GetPort(event->port_name()); 541 if (!targetPort.is_valid()) { 542 NODECONTROLLER_WARNING( 543 "Unexpected MergePortEvent from peer %s for unknown port %s", 544 ToString(fromNode).c_str(), ToString(event->port_name()).c_str()); 545 DropPeer(fromNode); 546 return; 547 } 548 549 // Check if `targetPort` is in our pending merges entry for the given source 550 // node. If this makes the `mPendingMerges` entry empty, remove it. 551 bool expectingMerge = [&] { 552 auto state = mState.Lock(); 553 auto pendingMerges = state->mPendingMerges.Lookup(aFromNode); 554 if (!pendingMerges) { 555 return false; 556 } 557 size_t removed = pendingMerges->RemoveElementsBy( 558 [&](auto& port) { return port.name() == targetPort.name(); }); 559 if (removed != 0 && pendingMerges->IsEmpty()) { 560 pendingMerges.Remove(); 561 } 562 return removed != 0; 563 }(); 564 565 if (!expectingMerge) { 566 NODECONTROLLER_WARNING( 567 "Unexpected MergePortEvent from peer %s for port %s", 568 ToString(fromNode).c_str(), ToString(event->port_name()).c_str()); 569 DropPeer(fromNode); 570 return; 571 } 572 } 573 574 (void)mNode->AcceptEvent(fromNode, std::move(event)); 575 } 576 577 void NodeController::OnBroadcast(const NodeName& aFromNode, 578 UniquePtr<IPC::Message> aMessage) { 579 MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE); 580 581 // NOTE: This method may be called off of the IO thread by the 582 // `BroadcastEvent` node callback. 583 if (!IsBroker()) { 584 NODECONTROLLER_WARNING("Broadcast request received by non-broker node"); 585 return; 586 } 587 588 UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage)); 589 if (!event) { 590 NODECONTROLLER_WARNING("Invalid broadcast message from peer"); 591 return; 592 } 593 594 nsTArray<RefPtr<NodeChannel>> peers; 595 { 596 auto state = mState.Lock(); 597 peers.SetCapacity(state->mPeers.Count()); 598 for (const auto& peer : state->mPeers.Values()) { 599 peers.AppendElement(peer); 600 } 601 } 602 for (const auto& peer : peers) { 603 // NOTE: This `clone` operation is only supported for a limited number of 604 // message types by the ports API, which provides some extra security by 605 // only allowing those specific types of messages to be broadcasted. 606 // Messages which don't support `CloneForBroadcast` cannot be broadcast, and 607 // the ports library will not attempt to broadcast them. 608 auto clone = event->CloneForBroadcast(); 609 if (!clone) { 610 NODECONTROLLER_WARNING("Attempt to broadcast unsupported message"); 611 break; 612 } 613 614 peer->SendEventMessage(SerializeEventMessage(std::move(clone))); 615 } 616 } 617 618 void NodeController::OnIntroduce(const NodeName& aFromNode, 619 NodeChannel::Introduction aIntroduction) { 620 AssertIOThread(); 621 622 if (aFromNode != kBrokerNodeName) { 623 NODECONTROLLER_WARNING("Introduction received from non-broker node"); 624 DropPeer(aFromNode); 625 return; 626 } 627 628 MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(), 629 "We're the wrong process to receive this?"); 630 631 if (!mChannelKind->is_valid_handle(aIntroduction.mHandle)) { 632 NODECONTROLLER_WARNING("Could not be introduced to peer %s", 633 ToString(aIntroduction.mName).c_str()); 634 mNode->LostConnectionToNode(aIntroduction.mName); 635 636 auto state = mState.Lock(); 637 state->mPendingMessages.Remove(aIntroduction.mName); 638 return; 639 } 640 641 RefPtr<IPC::Channel> channel = 642 IPC::Channel::Create(std::move(aIntroduction.mHandle), 643 IPC::Channel::MODE_PEER, aIntroduction.mOtherPid); 644 MOZ_ASSERT(channel->GetKind() == mChannelKind); 645 646 auto nodeChannel = MakeRefPtr<NodeChannel>(aIntroduction.mName, channel, this, 647 aIntroduction.mOtherPid); 648 649 { 650 auto state = mState.Lock(); 651 bool isNew = false; 652 state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() { 653 isNew = true; 654 return nodeChannel; 655 }); 656 if (!isNew) { 657 // We got a duplicate introduction. This can happen during normal 658 // execution if both sides request an introduction at the same time. We 659 // can just ignore the second one, as they'll arrive in the same order in 660 // both processes. 661 nodeChannel->Close(); 662 return; 663 } 664 665 // Deliver any pending messages, then remove the entry from our table. We do 666 // this while `mState` is still held to ensure that these messages are 667 // all sent before another thread can observe the newly created channel. 668 // As the channel hasn't been `Connect()`-ed yet, this will only queue the 669 // messages up to be sent, so is OK to do with the mutex held. These 670 // messages will be processed to be sent during `Start()` below, which is 671 // performed outside of the lock. 672 if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) { 673 while (!pending->IsEmpty()) { 674 nodeChannel->SendEventMessage(pending->Pop()); 675 } 676 pending.Remove(); 677 } 678 } 679 680 // NodeChannel::Start must be called with the lock not held, as it may lead to 681 // callbacks being made into `OnChannelError` or `OnMessageReceived`, which 682 // will attempt to re-acquire our lock. 683 nodeChannel->Start(); 684 } 685 686 void NodeController::OnRequestIntroduction(const NodeName& aFromNode, 687 const NodeName& aName) { 688 AssertIOThread(); 689 if (NS_WARN_IF(!IsBroker())) { 690 return; 691 } 692 693 RefPtr<NodeChannel> peerA = GetNodeChannel(aFromNode); 694 if (!peerA || aName == mojo::core::ports::kInvalidNodeName) { 695 NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s", 696 ToString(aFromNode).c_str()); 697 DropPeer(aFromNode); 698 return; 699 } 700 701 RefPtr<NodeChannel> peerB = GetNodeChannel(aName); 702 IPC::Channel::ChannelHandle handleA, handleB; 703 if (!peerB || !mChannelKind->create_raw_pipe(&handleA, &handleB)) { 704 NODECONTROLLER_WARNING( 705 "Rejecting introduction request from '%s' for unknown peer '%s'", 706 ToString(aFromNode).c_str(), ToString(aName).c_str()); 707 708 // We don't know this peer, or ran into issues creating the descriptor! Send 709 // an invalid introduction to content to clean up any pending outbound 710 // messages. 711 NodeChannel::Introduction intro{aName, IPC::Channel::ChannelHandle{}, 712 peerA->OtherPid(), base::kInvalidProcessId}; 713 peerA->Introduce(std::move(intro)); 714 return; 715 } 716 717 NodeChannel::Introduction introA{aName, std::move(handleA), peerA->OtherPid(), 718 peerB->OtherPid()}; 719 NodeChannel::Introduction introB{aFromNode, std::move(handleB), 720 peerB->OtherPid(), peerA->OtherPid()}; 721 peerA->Introduce(std::move(introA)); 722 peerB->Introduce(std::move(introB)); 723 } 724 725 void NodeController::OnAcceptInvite(const NodeName& aFromNode, 726 const NodeName& aRealName, 727 const PortName& aInitialPort) { 728 AssertIOThread(); 729 if (!IsBroker()) { 730 NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker"); 731 return; 732 } 733 734 if (aRealName == mojo::core::ports::kInvalidNodeName || 735 aInitialPort == mojo::core::ports::kInvalidPortName) { 736 NODECONTROLLER_WARNING("Invalid name in AcceptInvite message"); 737 DropPeer(aFromNode); 738 return; 739 } 740 741 bool inserted = false; 742 Invite invite; 743 { 744 auto state = mState.Lock(); 745 746 // Try to remove the source node from our invites list and insert it into 747 // our peers map under the new name. 748 if (state->mInvites.Remove(aFromNode, &invite)) { 749 MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid()); 750 state->mPeers.LookupOrInsertWith(aRealName, [&]() { 751 inserted = true; 752 return invite.mChannel; 753 }); 754 } 755 } 756 if (!inserted) { 757 NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s", 758 ToString(aFromNode).c_str()); 759 DropPeer(aFromNode); 760 return; 761 } 762 763 // Update the name of the node. This field is only accessed from the IO 764 // thread, so it's safe to update it without a lock held. 765 invite.mChannel->SetName(aRealName); 766 767 // Start the port merge to allow our existing initial port to begin 768 // communicating with the remote port. 769 PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort)); 770 } 771 772 void NodeController::OnChannelError(const NodeName& aFromNode) { 773 AssertIOThread(); 774 DropPeer(aFromNode); 775 } 776 777 static mojo::core::ports::NodeName RandomNodeName() { 778 return {RandomUint64OrDie(), RandomUint64OrDie()}; 779 } 780 781 bool NodeController::InviteChildProcess( 782 GeckoChildProcessHost* aChildProcessHost, 783 IPC::Channel::ChannelHandle* aClientHandle, ScopedPort* aInitialPort, 784 NodeChannel** aNodeChannel) { 785 MOZ_ASSERT(IsBroker()); 786 AssertIOThread(); 787 788 IPC::Channel::ChannelHandle serverHandle; 789 if (!mChannelKind->create_raw_pipe(&serverHandle, aClientHandle)) { 790 return false; 791 } 792 793 RefPtr<IPC::Channel> channel = IPC::Channel::Create( 794 std::move(serverHandle), IPC::Channel::MODE_BROKER_SERVER, 795 base::kInvalidProcessId); 796 MOZ_ASSERT(channel->GetKind() == mChannelKind); 797 798 // Create the peer with a randomly generated name, and store it in `mInvites`. 799 // This channel and name will be used for communication with the node until it 800 // sends us its' real name in an `AcceptInvite` message. 801 auto ports = CreatePortPair(); 802 auto inviteName = RandomNodeName(); 803 auto nodeChannel = MakeRefPtr<NodeChannel>( 804 inviteName, channel, this, base::kInvalidProcessId, aChildProcessHost); 805 { 806 auto state = mState.Lock(); 807 MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName), 808 "UUID conflict?"); 809 MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName), 810 "UUID conflict?"); 811 state->mInvites.InsertOrUpdate(inviteName, 812 Invite{nodeChannel, ports.second.Release()}); 813 } 814 815 nodeChannel->Start(); 816 817 *aInitialPort = std::move(ports.first); 818 nodeChannel.forget(aNodeChannel); 819 return true; 820 } 821 822 void NodeController::InitBrokerProcess( 823 const IPC::Channel::ChannelKind* aChannelKind) { 824 AssertIOThread(); 825 MOZ_ASSERT(!gNodeController); 826 gNodeController = new NodeController(kBrokerNodeName, aChannelKind); 827 } 828 829 ScopedPort NodeController::InitChildProcess( 830 IPC::Channel::ChannelHandle&& aChannelHandle, base::ProcessId aParentPid) { 831 AssertIOThread(); 832 MOZ_ASSERT(!gNodeController); 833 834 RefPtr<IPC::Channel> channel = IPC::Channel::Create( 835 std::move(aChannelHandle), IPC::Channel::MODE_BROKER_CLIENT, aParentPid); 836 837 auto nodeName = RandomNodeName(); 838 gNodeController = new NodeController(nodeName, channel->GetKind()); 839 840 auto ports = gNodeController->CreatePortPair(); 841 PortRef toMerge = ports.second.Release(); 842 843 // Mark the port as expecting a pending merge. This is a duplicate of the 844 // information tracked by `mPendingMerges`, and was added by upstream 845 // chromium. 846 // See https://chromium-review.googlesource.com/c/chromium/src/+/3289065 847 { 848 mojo::core::ports::SinglePortLocker locker(&toMerge); 849 locker.port()->pending_merge_peer = true; 850 } 851 852 auto nodeChannel = MakeRefPtr<NodeChannel>(kBrokerNodeName, channel, 853 gNodeController, aParentPid); 854 { 855 auto state = gNodeController->mState.Lock(); 856 MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName)); 857 state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel); 858 MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName)); 859 state->mPendingMerges.LookupOrInsert(kBrokerNodeName) 860 .AppendElement(toMerge); 861 } 862 863 nodeChannel->Start(); 864 nodeChannel->AcceptInvite(nodeName, toMerge.name()); 865 return std::move(ports.first); 866 } 867 868 void NodeController::CleanUp() { 869 AssertIOThread(); 870 MOZ_ASSERT(gNodeController); 871 872 RefPtr<NodeController> nodeController = gNodeController; 873 gNodeController = nullptr; 874 875 // Collect all objects from our state which need to be cleaned up. 876 nsTArray<NodeName> lostConnections; 877 nsTArray<RefPtr<NodeChannel>> channelsToClose; 878 nsTArray<PortRef> portsToClose; 879 { 880 auto state = nodeController->mState.Lock(); 881 for (const auto& chan : state->mPeers) { 882 lostConnections.AppendElement(chan.GetKey()); 883 channelsToClose.AppendElement(chan.GetData()); 884 } 885 for (const auto& pending : state->mPendingMessages.Keys()) { 886 lostConnections.AppendElement(pending); 887 } 888 for (const auto& invite : state->mInvites.Values()) { 889 channelsToClose.AppendElement(invite.mChannel); 890 portsToClose.AppendElement(invite.mToMerge); 891 } 892 for (const auto& pendingPorts : state->mPendingMerges.Values()) { 893 portsToClose.AppendElements(pendingPorts); 894 } 895 state->mPeers.Clear(); 896 state->mPendingMessages.Clear(); 897 state->mInvites.Clear(); 898 state->mPendingMerges.Clear(); 899 } 900 for (auto& nodeChannel : channelsToClose) { 901 nodeChannel->Close(); 902 } 903 for (auto& port : portsToClose) { 904 nodeController->mNode->ClosePort(port); 905 } 906 for (auto& name : lostConnections) { 907 nodeController->mNode->LostConnectionToNode(name); 908 } 909 } 910 911 #undef NODECONTROLLER_LOG 912 #undef NODECONTROLLER_WARNING 913 914 } // namespace mozilla::ipc