tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

commit 351966e29dc903d4f529fa78c0885ef9582f4acb
parent 764284c7fc09766d7448b10d8fc09a77daa29181
Author: Gabriele Svelto <gsvelto@mozilla.com>
Date:   Thu, 27 Nov 2025 15:29:52 +0000

Bug 1989686 - Switched the crash helper IPC code to use I/O completion ports on Windows r=afranchuk

This patch moves the waiting logic out of the IPCServer object and into a
separate IPCQueue one, it also makes the queue spit out entire messages rather
than just the header, effectively removing all sorts of asynchronous I/O from
the IPCServer and hiding its details in the crash_herlper_common crate.

These changes not only switch Windows to using I/O completion ports, but
improves error handling significantly. This in turn makes the crash helper
more reliable while handling child process disconnections.

Last but not least all raw references to HANDLEs held in structs within
the Windows implementation have been replaced with reference-counted ot
borrowed references.

Differential Revision: https://phabricator.services.mozilla.com/D270530

Diffstat:
MCargo.lock | 1+
Mtoolkit/crashreporter/crash_helper_client/src/lib.rs | 4----
Mtoolkit/crashreporter/crash_helper_common/Cargo.toml | 1+
Mtoolkit/crashreporter/crash_helper_common/src/errors.rs | 2--
Mtoolkit/crashreporter/crash_helper_common/src/ipc_channel.rs | 2+-
Mtoolkit/crashreporter/crash_helper_common/src/ipc_channel/windows.rs | 12+++++++-----
Mtoolkit/crashreporter/crash_helper_common/src/ipc_connector.rs | 16+++++++++++-----
Mtoolkit/crashreporter/crash_helper_common/src/ipc_connector/unix.rs | 43+++++++++++++++++++++++++++++++------------
Mtoolkit/crashreporter/crash_helper_common/src/ipc_connector/windows.rs | 139++++++++++++++++++++++++++++++++-----------------------------------------------
Mtoolkit/crashreporter/crash_helper_common/src/ipc_listener/windows.rs | 84+++++++++++++++++++++++++++++--------------------------------------------------
Atoolkit/crashreporter/crash_helper_common/src/ipc_queue.rs | 53+++++++++++++++++++++++++++++++++++++++++++++++++++++
Atoolkit/crashreporter/crash_helper_common/src/ipc_queue/unix.rs | 93+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atoolkit/crashreporter/crash_helper_common/src/ipc_queue/windows.rs | 281+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtoolkit/crashreporter/crash_helper_common/src/lib.rs | 7+++++--
Mtoolkit/crashreporter/crash_helper_common/src/platform/windows.rs | 185+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Mtoolkit/crashreporter/crash_helper_server/src/ipc_server.rs | 130++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Dtoolkit/crashreporter/crash_helper_server/src/ipc_server/unix.rs | 65-----------------------------------------------------------------
Dtoolkit/crashreporter/crash_helper_server/src/ipc_server/windows.rs | 98-------------------------------------------------------------------------------
Mtoolkit/crashreporter/crash_helper_server/src/lib.rs | 84++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
19 files changed, 818 insertions(+), 482 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1165,6 +1165,7 @@ dependencies = [ name = "crash_helper_common" version = "0.1.0" dependencies = [ + "getrandom 0.3.3", "minidump-writer", "nix 0.30.1", "num-derive", diff --git a/toolkit/crashreporter/crash_helper_client/src/lib.rs b/toolkit/crashreporter/crash_helper_client/src/lib.rs @@ -83,10 +83,6 @@ impl CrashHelperClient { let message = messages::TransferMinidump::new(pid); self.connector.send_message(message)?; - // HACK: Workaround for a macOS-specific bug - #[cfg(target_os = "macos")] - self.connector.poll(nix::poll::PollFlags::POLLIN)?; - let reply = self .connector .recv_reply::<messages::TransferMinidumpReply>()?; diff --git a/toolkit/crashreporter/crash_helper_common/Cargo.toml b/toolkit/crashreporter/crash_helper_common/Cargo.toml @@ -17,6 +17,7 @@ nix = { version = "0.30", features = ["fs", "poll", "socket", "uio"] } minidump-writer = "0.10" [target."cfg(target_os = \"windows\")".dependencies] +getrandom = { version = "0.3" } windows-sys = { version = "0.52", features = [ "Win32_Foundation", "Win32_Security", diff --git a/toolkit/crashreporter/crash_helper_common/src/errors.rs b/toolkit/crashreporter/crash_helper_common/src/errors.rs @@ -35,6 +35,4 @@ pub enum IPCError { TransmissionFailure(PlatformError), #[error("Unexpected message of kind: {0:?}")] UnexpectedMessage(messages::Kind), - #[error("Error while waiting for events, error: {0:?}")] - WaitingFailure(Option<SystemError>), } diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_channel.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_channel.rs @@ -4,7 +4,7 @@ use thiserror::Error; -use crate::{errors::IPCError, IPCListenerError, PlatformError}; +use crate::{errors::IPCError, platform::PlatformError, IPCListenerError}; /***************************************************************************** * Error definitions * diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_channel/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_channel/windows.rs @@ -2,14 +2,15 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::{ffi::CString, hash::RandomState, process}; +use std::{ffi::CString, process}; use windows_sys::Win32::Foundation::{ERROR_ACCESS_DENIED, ERROR_ADDRESS_ALREADY_ASSOCIATED}; use crate::{ ipc_channel::IPCChannelError, - platform::{windows::server_addr, PlatformError}, - IPCConnector, IPCListener, IPCListenerError, Pid, + ipc_listener::IPCListenerError, + platform::windows::{server_addr, PlatformError}, + IPCConnector, IPCListener, Pid, }; pub struct IPCChannel { @@ -70,8 +71,9 @@ impl IPCClientChannel { // We pick the listener name at random, as unlikely as it may be there // could be clashes so try a few times before giving up. for _i in 0..ATTEMPTS { - use std::hash::{BuildHasher, Hasher}; - let random_id = RandomState::new().build_hasher().finish(); + let Ok(random_id) = getrandom::u64() else { + continue; + }; let pipe_name = CString::new(format!( "\\\\.\\pipe\\gecko-crash-helper-child-pipe.{random_id:}" diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_connector.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_connector.rs @@ -2,12 +2,14 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +use std::rc::Rc; + use crate::messages::Header; pub enum IPCEvent { - Connect(IPCConnector), - Header(usize, Header), - Disconnect(usize), + Connect(Rc<IPCConnector>), + Message(IPCConnectorKey, Header, Vec<u8>, Option<AncillaryData>), + Disconnect(IPCConnectorKey), } /***************************************************************************** @@ -15,7 +17,9 @@ pub enum IPCEvent { *****************************************************************************/ #[cfg(target_os = "windows")] -pub use windows::{AncillaryData, IPCConnector, RawAncillaryData, INVALID_ANCILLARY_DATA}; +pub use windows::{ + AncillaryData, IPCConnector, IPCConnectorKey, RawAncillaryData, INVALID_ANCILLARY_DATA, +}; #[cfg(target_os = "windows")] pub(crate) mod windows; @@ -25,7 +29,9 @@ pub(crate) mod windows; *****************************************************************************/ #[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] -pub use unix::{AncillaryData, IPCConnector, RawAncillaryData, INVALID_ANCILLARY_DATA}; +pub use unix::{ + AncillaryData, IPCConnector, IPCConnectorKey, RawAncillaryData, INVALID_ANCILLARY_DATA, +}; #[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] pub(crate) mod unix; diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_connector/unix.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_connector/unix.rs @@ -6,7 +6,9 @@ use crate::platform::linux::{set_socket_cloexec, set_socket_default_flags}; #[cfg(target_os = "macos")] use crate::platform::macos::{set_socket_cloexec, set_socket_default_flags}; -use crate::{ignore_eintr, IntoRawAncillaryData, PlatformError, ProcessHandle, IO_TIMEOUT}; +use crate::{ + ignore_eintr, platform::PlatformError, IntoRawAncillaryData, ProcessHandle, IO_TIMEOUT, +}; use nix::{ cmsg_space, @@ -17,7 +19,7 @@ use nix::{ use std::{ ffi::{CStr, CString}, io::{IoSlice, IoSliceMut}, - os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, str::FromStr, }; @@ -38,6 +40,8 @@ impl IntoRawAncillaryData for AncillaryData { // This must match `kInvalidHandle` in `mfbt/UniquePtrExt.h` pub const INVALID_ANCILLARY_DATA: RawAncillaryData = -1; +pub type IPCConnectorKey = RawFd; + pub struct IPCConnector { socket: OwnedFd, } @@ -82,7 +86,7 @@ impl IPCConnector { /// command-line to a child process. This only works for newly /// created connectors because they are explicitly created as inheritable. pub fn serialize(&self) -> CString { - CString::new(self.socket.as_raw_fd().to_string()).unwrap() + CString::new(self.as_raw().to_string()).unwrap() } /// Deserialize a connector from an argument passed on the command-line. @@ -102,11 +106,15 @@ impl IPCConnector { self.socket.into_raw() } - pub fn as_raw_ref(&self) -> BorrowedFd<'_> { - self.socket.as_fd() + pub(crate) fn as_raw(&self) -> RawFd { + self.socket.as_raw_fd() } - pub fn poll(&self, flags: PollFlags) -> Result<(), PlatformError> { + pub fn key(&self) -> IPCConnectorKey { + self.socket.as_raw_fd() + } + + fn poll(&self, flags: PollFlags) -> Result<(), PlatformError> { let timeout = PollTimeout::from(IO_TIMEOUT); let res = ignore_eintr!(poll( &mut [PollFd::new(self.socket.as_fd(), flags)], @@ -134,6 +142,11 @@ impl IPCConnector { where T: Message, { + // HACK: Workaround for a macOS-specific bug + #[cfg(target_os = "macos")] + self.poll(PollFlags::POLLIN) + .map_err(IPCError::ReceptionFailure)?; + let header = self.recv_header()?; if header.kind != T::kind() { @@ -150,7 +163,7 @@ impl IPCConnector { let scm = ControlMessage::ScmRights(&scm_fds); let res = ignore_eintr!(sendmsg::<()>( - self.socket.as_raw_fd(), + self.as_raw(), &iov, &[scm], MsgFlags::empty(), @@ -162,7 +175,10 @@ impl IPCConnector { if bytes_sent == buff.len() { Ok(()) } else { - Err(PlatformError::SendTooShort(buff.len(), bytes_sent)) + Err(PlatformError::SendTooShort { + expected: buff.len(), + sent: bytes_sent, + }) } } Err(code) => Err(PlatformError::SendFailure(code)), @@ -182,7 +198,7 @@ impl IPCConnector { } } - pub fn recv_header(&self) -> Result<messages::Header, IPCError> { + pub(crate) fn recv_header(&self) -> Result<messages::Header, IPCError> { let (header, _) = self.recv(messages::HEADER_SIZE)?; messages::Header::decode(&header).map_err(IPCError::BadMessage) } @@ -196,7 +212,7 @@ impl IPCConnector { let mut iov = [IoSliceMut::new(&mut buff)]; let res = ignore_eintr!(recvmsg::<()>( - self.socket.as_raw_fd(), + self.as_raw(), &mut iov, Some(&mut cmsg_buffer), MsgFlags::empty(), @@ -212,7 +228,7 @@ impl IPCConnector { let res = match res { #[cfg(target_os = "macos")] Err(_code @ Errno::ENOMEM) => ignore_eintr!(recvmsg::<()>( - self.socket.as_raw_fd(), + self.as_raw(), &mut iov, Some(&mut cmsg_buffer), MsgFlags::empty(), @@ -232,7 +248,10 @@ impl IPCConnector { }; if res.bytes != expected_size { - return Err(PlatformError::ReceiveTooShort(expected_size, res.bytes)); + return Err(PlatformError::ReceiveTooShort { + expected: expected_size, + received: res.bytes, + }); } Ok((buff, fd)) diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_connector/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_connector/windows.rs @@ -4,10 +4,9 @@ use crate::{ errors::IPCError, - messages::{self, Message}, - platform::{ - windows::{create_manual_reset_event, get_last_error, OverlappedOperation}, - PlatformError, + messages::{self, Message, HEADER_SIZE}, + platform::windows::{ + create_manual_reset_event, get_last_error, OverlappedOperation, PlatformError, }, IntoRawAncillaryData, IO_TIMEOUT, }; @@ -15,7 +14,9 @@ use crate::{ use std::{ ffi::{CStr, OsString}, io::Error, - os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}, + os::windows::io::{ + AsHandle, AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle, + }, ptr::null_mut, rc::Rc, str::FromStr, @@ -24,7 +25,7 @@ use std::{ use windows_sys::Win32::{ Foundation::{ DuplicateHandle, DUPLICATE_CLOSE_SOURCE, DUPLICATE_SAME_ACCESS, ERROR_FILE_NOT_FOUND, - ERROR_INVALID_MESSAGE, ERROR_PIPE_BUSY, FALSE, HANDLE, INVALID_HANDLE_VALUE, + ERROR_PIPE_BUSY, FALSE, HANDLE, INVALID_HANDLE_VALUE, }, Security::SECURITY_ATTRIBUTES, Storage::FileSystem::{ @@ -69,13 +70,13 @@ fn extract_buffer_and_handle(buffer: Vec<u8>) -> Result<(Vec<u8>, Option<OwnedHa Ok((data.to_vec(), handle)) } +pub type IPCConnectorKey = usize; + pub struct IPCConnector { /// A connected pipe handle handle: Rc<OwnedHandle>, /// A handle to an event which will be used for overlapped I/O on the pipe event: OwnedHandle, - /// Stores the only pending operation we might have on the pipe - overlapped: Option<OverlappedOperation>, /// The process at the other end of the pipe, this is needed to send /// ancillary data and a send operation will fail if not set. process: Option<OwnedHandle>, @@ -88,7 +89,6 @@ impl IPCConnector { Ok(IPCConnector { handle: Rc::new(handle), event, - overlapped: None, process: None, }) } @@ -109,8 +109,12 @@ impl IPCConnector { self.process = Some(process); } - pub fn event_raw_handle(&self) -> HANDLE { - self.event.as_raw_handle() as HANDLE + pub(crate) fn as_raw(&self) -> HANDLE { + self.handle.as_raw_handle() as HANDLE + } + + pub fn key(&self) -> IPCConnectorKey { + self.handle.as_raw_handle() as IPCConnectorKey } pub fn connect(server_addr: &CStr) -> Result<IPCConnector, IPCError> { @@ -219,97 +223,68 @@ impl IPCConnector { where T: Message, { - // Send the message header - self.send(&message.header(), None)?; + self.send_message_internal(message) + .map_err(IPCError::TransmissionFailure) + } - // Send the message payload + fn send_message_internal<T>(&self, message: T) -> Result<(), PlatformError> + where + T: Message, + { + let header = message.header(); let (payload, ancillary_data) = message.into_payload(); - self.send(&payload, ancillary_data)?; - Ok(()) + // Send the message header + OverlappedOperation::send(&self.handle, self.event.as_handle(), header)?; + + // Send the message payload plus the optional handles + let handle = if let Some(handle) = ancillary_data { + self.clone_handle(handle)? + } else { + INVALID_ANCILLARY_DATA + }; + + let mut buffer = Vec::<u8>::with_capacity(HANDLE_SIZE + payload.len()); + buffer.extend(handle.to_ne_bytes()); + buffer.extend(payload); + + OverlappedOperation::send(&self.handle, self.event.as_handle(), buffer) } pub fn recv_reply<T>(&self) -> Result<T, IPCError> where T: Message, { - let header = self.recv_header()?; + let header = self + .recv_buffer(messages::HEADER_SIZE) + .map_err(IPCError::ReceptionFailure)?; + let header = messages::Header::decode(&header).map_err(IPCError::BadMessage)?; if header.kind != T::kind() { - return Err(IPCError::ReceptionFailure( - crate::platform::PlatformError::IOError(ERROR_INVALID_MESSAGE), - )); + return Err(IPCError::UnexpectedMessage(header.kind)); } - let (data, _) = self.recv(header.size)?; - T::decode(&data, None).map_err(IPCError::from) + let (buffer, handle) = self.recv(header.size)?; + T::decode(&buffer, handle).map_err(IPCError::from) } - fn recv_header(&self) -> Result<messages::Header, IPCError> { - let (header, _) = self.recv(messages::HEADER_SIZE)?; - messages::Header::decode(&header).map_err(IPCError::BadMessage) + pub(crate) fn sched_recv_header(&self) -> Result<OverlappedOperation, IPCError> { + OverlappedOperation::sched_recv(&self.handle, HEADER_SIZE) + .map_err(IPCError::ReceptionFailure) } - pub fn sched_recv_header(&mut self) -> Result<(), IPCError> { - if self.overlapped.is_some() { - // We're already waiting for a header. - return Ok(()); - } - - self.overlapped = Some( - OverlappedOperation::sched_recv( - &self.handle, - self.event_raw_handle(), - HANDLE_SIZE + messages::HEADER_SIZE, - ) - .map_err(IPCError::ReceptionFailure)?, - ); - Ok(()) - } - - pub fn collect_header(&mut self) -> Result<messages::Header, IPCError> { - // We should never call collect_header() on a connector that wasn't - // waiting for one, so panic in that scenario. - let overlapped = self.overlapped.take().unwrap(); - let buffer = overlapped - .collect_recv(/* wait */ false) + pub(crate) fn recv( + &self, + expected_size: usize, + ) -> Result<(Vec<u8>, Option<AncillaryData>), IPCError> { + let buffer = self + .recv_buffer(HANDLE_SIZE + expected_size) .map_err(IPCError::ReceptionFailure)?; - let (data, _) = extract_buffer_and_handle(buffer)?; - messages::Header::decode(data.as_ref()).map_err(IPCError::BadMessage) - } - - fn send(&self, buff: &[u8], handle: Option<AncillaryData>) -> Result<(), IPCError> { - let handle = if let Some(handle) = handle { - self.clone_handle(handle) - .map_err(IPCError::TransmissionFailure)? - } else { - INVALID_ANCILLARY_DATA - }; - - let mut buffer = Vec::<u8>::with_capacity(HANDLE_SIZE + buff.len()); - buffer.extend(handle.to_ne_bytes()); - buffer.extend(buff); - - let overlapped = - OverlappedOperation::sched_send(&self.handle, self.event_raw_handle(), buffer) - .map_err(IPCError::TransmissionFailure)?; - - overlapped - .complete_send(/* wait */ true) - .map_err(IPCError::TransmissionFailure) + extract_buffer_and_handle(buffer) } - pub fn recv(&self, expected_size: usize) -> Result<(Vec<u8>, Option<AncillaryData>), IPCError> { - let overlapped = OverlappedOperation::sched_recv( - &self.handle, - self.event_raw_handle(), - HANDLE_SIZE + expected_size, - ) - .map_err(IPCError::ReceptionFailure)?; - let buffer = overlapped - .collect_recv(/* wait */ true) - .map_err(IPCError::ReceptionFailure)?; - extract_buffer_and_handle(buffer) + fn recv_buffer(&self, expected_size: usize) -> Result<Vec<u8>, PlatformError> { + OverlappedOperation::recv(&self.handle, self.event.as_handle(), expected_size) } /// Clone a handle in the destination process, this is required to diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_listener/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_listener/windows.rs @@ -4,14 +4,13 @@ use crate::{ errors::IPCError, - platform::{ - windows::{create_manual_reset_event, server_addr, OverlappedOperation}, - PlatformError, - }, - IPCConnector, IPCListenerError, Pid, + ipc_listener::IPCListenerError, + platform::windows::{get_last_error, server_addr, OverlappedOperation, PlatformError}, + IPCConnector, Pid, }; use std::{ + cell::RefCell, ffi::{CStr, CString, OsString}, os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, ptr::null_mut, @@ -19,7 +18,7 @@ use std::{ str::FromStr, }; use windows_sys::Win32::{ - Foundation::{GetLastError, HANDLE, INVALID_HANDLE_VALUE, TRUE}, + Foundation::{HANDLE, INVALID_HANDLE_VALUE, TRUE}, Security::SECURITY_ATTRIBUTES, Storage::FileSystem::{ FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, @@ -34,66 +33,53 @@ pub struct IPCListener { /// The name of the pipe this listener will be bound to server_addr: CString, /// A named pipe handle listening for incoming connections - handle: Rc<OwnedHandle>, - /// Stores the only pending operation we might have on the pipe + handle: RefCell<Rc<OwnedHandle>>, + /// Stores the only listen operation that might be pending overlapped: Option<OverlappedOperation>, - /// A handle to an event which will be used for overlapped I/O on the pipe - event: OwnedHandle, } impl IPCListener { pub(crate) fn new(server_addr: CString) -> Result<IPCListener, IPCListenerError> { let pipe = create_named_pipe(&server_addr, /* first_instance */ true) .map_err(IPCListenerError::PipeCreationFailure)?; - let event = create_manual_reset_event().map_err(IPCListenerError::AcceptError)?; Ok(IPCListener { server_addr, - handle: Rc::new(pipe), + handle: RefCell::new(Rc::new(pipe)), overlapped: None, - event, }) } - pub fn event_raw_handle(&self) -> HANDLE { - self.event.as_raw_handle() as HANDLE + pub(crate) fn as_raw(&self) -> HANDLE { + self.handle.borrow().as_raw_handle() as HANDLE } pub(crate) fn address(&self) -> &CStr { &self.server_addr } + pub(crate) fn sched_listen(&self) -> Result<OverlappedOperation, IPCListenerError> { + OverlappedOperation::listen(&self.handle.borrow()).map_err(IPCListenerError::ListenError) + } + pub(crate) fn listen(&mut self) -> Result<(), IPCListenerError> { - self.overlapped = Some( - OverlappedOperation::listen(&self.handle, self.event_raw_handle()) - .map_err(IPCListenerError::ListenError)?, - ); + self.overlapped = Some(self.sched_listen()?); Ok(()) } pub fn accept(&mut self) -> Result<IPCConnector, IPCListenerError> { - let connected_pipe = { - // We extract the overlapped operation within this scope so - // that it's dropped right away. This ensures that only one - // reference to the listener's handle remains. - let overlapped = self - .overlapped - .take() - .expect("Accepting a connection without listening first"); - overlapped - .accept(self.handle.as_raw_handle() as HANDLE) - .map_err(IPCListenerError::AcceptError)?; - - let new_pipe = create_named_pipe(&self.server_addr, /* first_instance */ false) - .map_err(IPCListenerError::PipeCreationFailure)?; - - std::mem::replace(&mut self.handle, Rc::new(new_pipe)) - }; + let overlapped = self + .overlapped + .take() + .expect("Accepting a connection without listening first"); + overlapped.accept().map_err(IPCListenerError::AcceptError)?; + self.replace_pipe() + } - // Once we've accepted a new connection and replaced the listener's - // pipe we need to listen again before we return, so that we're ready - // for the next iteration. - self.listen()?; + pub(crate) fn replace_pipe(&self) -> Result<IPCConnector, IPCListenerError> { + let new_pipe = create_named_pipe(&self.server_addr, /* first_instance */ false) + .map_err(IPCListenerError::PipeCreationFailure)?; + let connected_pipe = self.handle.replace(Rc::new(new_pipe)); // We can guarantee that there's only one reference to this handle at // this point in time. @@ -106,7 +92,7 @@ impl IPCListener { /// command-line to a child process. This only works for newly /// created listeners because they are explicitly created as inheritable. pub fn serialize(&self) -> OsString { - let raw_handle = self.handle.as_raw_handle() as usize; + let raw_handle = self.handle.borrow().as_raw_handle() as usize; OsString::from_str(raw_handle.to_string().as_ref()).unwrap() } @@ -118,20 +104,12 @@ impl IPCListener { let handle = usize::from_str(string).map_err(|_e| IPCError::ParseError)?; // SAFETY: This is a handle we passed in ourselves. let handle = unsafe { OwnedHandle::from_raw_handle(handle as RawHandle) }; - let event = create_manual_reset_event().map_err(IPCListenerError::CreationError)?; - let mut listener = IPCListener { + Ok(IPCListener { server_addr, - handle: Rc::new(handle), + handle: RefCell::new(Rc::new(handle)), overlapped: None, - event, - }; - - // Since we've inherited this handler we need to start a new - // asynchronous operation to listen for incoming connections. - listener.listen()?; - - Ok(listener) + }) } } @@ -176,7 +154,7 @@ fn create_named_pipe( }; if pipe == INVALID_HANDLE_VALUE { - return Err(PlatformError::CreatePipeFailure(unsafe { GetLastError() })); + return Err(PlatformError::CreatePipeFailure(get_last_error())); } // SAFETY: We just verified that the handle is valid. diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_queue.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_queue.rs @@ -0,0 +1,53 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use thiserror::Error; + +use crate::{ + errors::{IPCError, SystemError}, + messages::MessageError, + IPCListenerError, +}; + +/***************************************************************************** + * Error definitions * + *****************************************************************************/ + +#[derive(Debug, Error)] +pub enum IPCQueueError { + #[error("Could not create queue: {0}")] + CreationFailure(SystemError), + #[error("Could not register with queue: {0}")] + RegistrationFailure(SystemError), + #[error("Could not post an event on the queue: {0}")] + PostEventFailure(SystemError), + #[error("Could not wait for events: {0}")] + WaitError(SystemError), + #[error("Underlying IPC connector error: {0}")] + IPCError(#[from] IPCError), + #[error("Underlying IPC listener error: {0}")] + IPCListenerError(#[from] IPCListenerError), + #[error("Underlying message error: {0}")] + MessageError(#[from] MessageError), +} + +/***************************************************************************** + * Windows * + *****************************************************************************/ + +#[cfg(target_os = "windows")] +pub use windows::IPCQueue; + +#[cfg(target_os = "windows")] +pub(crate) mod windows; + +/***************************************************************************** + * Android, macOS & Linux * + *****************************************************************************/ + +#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] +pub use unix::IPCQueue; + +#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] +pub(crate) mod unix; diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_queue/unix.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_queue/unix.rs @@ -0,0 +1,93 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; +use std::{collections::HashMap, os::fd::BorrowedFd, rc::Rc}; + +use crate::{ + ignore_eintr, ipc_queue::IPCQueueError, IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, +}; + +pub struct IPCQueue { + connectors: HashMap<IPCConnectorKey, Rc<IPCConnector>>, +} + +impl IPCQueue { + pub fn new(_listener: IPCListener) -> Result<IPCQueue, IPCQueueError> { + let connectors = HashMap::with_capacity(10); + Ok(IPCQueue { connectors }) + } + + pub fn add_connector(&mut self, connector: &Rc<IPCConnector>) -> Result<(), IPCQueueError> { + let res = self.connectors.insert(connector.key(), connector.clone()); + debug_assert!(res.is_none()); + Ok(()) + } + + pub fn add_listener(&self, _listener: &IPCListener) -> Result<(), IPCQueueError> { + Ok(()) + } + + pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCQueueError> { + let mut pollfds = Vec::with_capacity(self.connectors.len()); + // SAFETY: All the fds held by the queue are known to be valid. + pollfds.extend(self.connectors.iter().map(|connector| { + PollFd::new( + unsafe { BorrowedFd::borrow_raw(connector.1.as_raw()) }, + PollFlags::POLLIN, + ) + })); + + let mut events = Vec::<IPCEvent>::new(); + let mut num_events = ignore_eintr!(poll(&mut pollfds, PollTimeout::NONE)) + .map_err(IPCQueueError::WaitError)?; + + for (pollfd, (&key, connector)) in pollfds.iter().zip(&self.connectors) { + // revents() returns None only if the kernel sends back data + // that nix does not understand, we can safely assume this + // never happens in practice hence the unwrap(). + let Some(revents) = pollfd.revents() else { + // TODO: We should log this error, disconnect the socket or do + // both things. Probably needs a new event type. + continue; + }; + + if revents.contains(PollFlags::POLLHUP) { + events.push(IPCEvent::Disconnect(key)); + // If a process was disconnected then skip all further + // processing of the socket. This wouldn't matter normally, + // but on macOS calling recvmsg() on a hung-up socket seems + // to trigger a kernel panic, one we've already encountered + // in the past. Doing things this way avoids the panic + // while having no real downsides. + continue; + } + + if revents.contains(PollFlags::POLLIN) { + let header = connector.recv_header()?; + let payload = connector + .recv(header.size) + .map_err(IPCQueueError::IPCError)?; + events.push(IPCEvent::Message(key, header, payload.0, payload.1)); + } + + if !revents.is_empty() { + num_events -= 1; + + if num_events == 0 { + break; + } + } + } + + // Remove all connectors for which we've received disconnect events. + for event in &events { + if let IPCEvent::Disconnect(key) = event { + self.connectors.remove(key); + } + } + + Ok(events) + } +} diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_queue/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_queue/windows.rs @@ -0,0 +1,281 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::{ + collections::HashMap, + mem::MaybeUninit, + os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, + ptr::null_mut, + rc::Rc, +}; +use windows_sys::Win32::{ + Foundation::{ERROR_BROKEN_PIPE, FALSE, HANDLE, INVALID_HANDLE_VALUE}, + System::{ + Threading::INFINITE, + IO::{CreateIoCompletionPort, GetQueuedCompletionStatus, OVERLAPPED}, + }, +}; + +use crate::{ + errors::IPCError, + ipc_queue::IPCQueueError, + messages::Header, + platform::{ + windows::{get_last_error, OverlappedOperation}, + PlatformError, + }, + IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, +}; + +const CONCURRENT_THREADS: u32 = 1; + +struct IPCQueueElement { + connector: Rc<IPCConnector>, + operation: Option<OverlappedOperation>, +} + +pub struct IPCQueue { + connectors: HashMap<IPCConnectorKey, IPCQueueElement>, + listener: IPCListener, + listen_operation: Option<OverlappedOperation>, + port: OwnedHandle, +} + +impl IPCQueue { + pub fn new(listener: IPCListener) -> Result<IPCQueue, IPCQueueError> { + let listener_port = listener.as_raw(); + + // Create a new completion port that allows only one active thread. + let port = unsafe { + CreateIoCompletionPort( + /* FileHandle */ INVALID_HANDLE_VALUE, + /* ExistingCompletionPort */ 0, + /* CompletionKey */ 0, + CONCURRENT_THREADS, + ) as RawHandle + }; + + if port.is_null() { + return Err(IPCQueueError::CreationFailure(get_last_error())); + } + + let mut queue = IPCQueue { + connectors: HashMap::with_capacity(10), + listener, + listen_operation: None, + port: unsafe { OwnedHandle::from_raw_handle(port) }, + }; + + queue.add_handle(listener_port)?; + + Ok(queue) + } + + pub fn add_connector(&mut self, connector: &Rc<IPCConnector>) -> Result<(), IPCQueueError> { + self.add_handle(connector.as_raw())?; + self.insert_connector(connector); + Ok(()) + } + + fn insert_connector(&mut self, connector: &Rc<IPCConnector>) { + let res = self.connectors.insert( + connector.key(), + IPCQueueElement { + connector: connector.clone(), + operation: None, + }, + ); + debug_assert!(res.is_none()); + } + + fn add_handle(&mut self, handle: HANDLE) -> Result<(), IPCQueueError> { + let port = unsafe { + CreateIoCompletionPort( + handle, + self.port.as_raw_handle() as HANDLE, + // Use the connector's handle as the events' key + handle as usize, + CONCURRENT_THREADS, + ) as RawHandle + }; + + if port.is_null() { + return Err(IPCQueueError::RegistrationFailure(get_last_error())); + } + + Ok(()) + } + + pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCQueueError> { + let mut events = Vec::with_capacity(1); + + for element in self.connectors.values_mut() { + if element.operation.is_none() { + match element.connector.sched_recv_header() { + Ok(operation) => element.operation = Some(operation), + Err(_error @ IPCError::ReceptionFailure(PlatformError::BrokenPipe)) => { + events.push(IPCEvent::Disconnect(element.connector.key())); + } + Err(error) => return Err(IPCQueueError::from(error)), + } + } + } + + for event in &events { + if let IPCEvent::Disconnect(key) = event { + self.connectors.remove(key); + } + } + + if self.connectors.len() == 0 { + // The last client disconnected. + return Ok(events); + } + + if self.listen_operation.is_none() { + self.listen_operation = Some(self.listener.sched_listen()?); + } + + let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); + let mut completion_key = MaybeUninit::<IPCConnectorKey>::uninit(); + let mut overlapped = MaybeUninit::<*mut OVERLAPPED>::uninit(); + + let res = unsafe { + GetQueuedCompletionStatus( + self.port.as_raw_handle() as HANDLE, + number_of_bytes_transferred.as_mut_ptr(), + completion_key.as_mut_ptr(), + overlapped.as_mut_ptr(), + INFINITE, + ) + }; + + // SAFETY: `overlapped` will always be populated by + // `GetQueueCompletionStatus()` so it's safe to assume initialization. + let overlapped = unsafe { overlapped.assume_init() }; + + if res == FALSE { + let err = get_last_error(); + + // If `overlapped` is non-null then the completion packet contained + // the result of a failed I/O operation. We only handle failures + // caused by a broken pipes, all others are considered fatal. + if !overlapped.is_null() && (err == ERROR_BROKEN_PIPE) { + // SAFETY: `overlapped` was non-null, so `completion_key` has + // also been populated by `GetQueuedCompletionStatus()`. + let completion_key = unsafe { completion_key.assume_init() }; + let element = self.connectors.remove(&completion_key); + debug_assert!(element.is_some(), "Completion on missing connector"); + events.push(IPCEvent::Disconnect(completion_key)); + } else { + return Err(IPCQueueError::WaitError(err)); + } + } else { + // SAFETY: `GetQueueCompletionStatus()` successfully retrieved a + // completed I/O operation, all parameters have been populated. + let (number_of_bytes_transferred, completion_key) = unsafe { + ( + number_of_bytes_transferred.assume_init(), + completion_key.assume_init(), + ) + }; + + if number_of_bytes_transferred == 0 { + // This is an event on the listener + debug_assert!( + self.listener.as_raw() as IPCConnectorKey == completion_key, + "Completion event doesn't match the listener" + ); + let operation = self.listen_operation.take(); + if let Some(operation) = operation { + operation + .accept() + .map_err(|_e| IPCQueueError::RegistrationFailure(0))?; + } + let connector = Rc::new(self.listener.replace_pipe()?); + self.insert_connector(&connector); + + // After the pipe is connected the listener handle will have been + // replaced with a new one, so associate the new handle with the + // completion queue. + self.add_handle(self.listener.as_raw())?; + + events.push(IPCEvent::Connect(connector)); + } else { + let element = self + .connectors + .get_mut(&completion_key) + .expect("Event did not match a known connector"); + let operation = element + .operation + .take() + .expect("No pending receive operation"); + let buffer = &operation.collect_recv(); + let header = Header::decode(buffer)?; + let payload = element.connector.recv(header.size); + match payload { + Ok(payload) => { + events.push(IPCEvent::Message( + completion_key, + header, + payload.0, + payload.1, + )); + } + Err(_error @ IPCError::ReceptionFailure(PlatformError::BrokenPipe)) => { + // This connector will generate a disconnection event + // when `wait_for_events()` is called again. Do nothing + // for the time being. + } + Err(error) => return Err(IPCQueueError::from(error)), + } + } + } + + Ok(events) + } +} + +impl Drop for IPCQueue { + fn drop(&mut self) { + // Cancel all the pending operations. + for element in self.connectors.values_mut() { + if let Some(operation) = &mut element.operation { + if !operation.cancel() { + operation.leak(); + } + } + } + + if let Some(operation) = &mut self.listen_operation { + if !operation.cancel() { + operation.leak(); + } + } + + // Drain the queue, once no more events are left we can safely drop it. + loop { + let mut number_of_bytes_transferred: u32 = 0; + let mut completion_key: IPCConnectorKey = 0; + let mut overlapped: *mut OVERLAPPED = null_mut(); + + let res = unsafe { + GetQueuedCompletionStatus( + self.port.as_raw_handle() as HANDLE, + &mut number_of_bytes_transferred, + &mut completion_key, + &mut overlapped, + 0, + ) + }; + + // TODO: Check that we got enough completion events? + + if res == FALSE && overlapped.is_null() { + // TODO: Maybe check the error and report odd ones? + break; + } + } + } +} diff --git a/toolkit/crashreporter/crash_helper_common/src/lib.rs b/toolkit/crashreporter/crash_helper_common/src/lib.rs @@ -11,6 +11,7 @@ mod breakpad; mod ipc_channel; mod ipc_connector; mod ipc_listener; +mod ipc_queue; mod platform; use messages::MessageError; @@ -19,10 +20,12 @@ use messages::MessageError; pub use crate::breakpad::{BreakpadChar, BreakpadData, BreakpadRawData, Pid}; pub use crate::ipc_channel::{IPCChannel, IPCClientChannel}; pub use crate::ipc_connector::{ - AncillaryData, IPCConnector, IPCEvent, RawAncillaryData, INVALID_ANCILLARY_DATA, + AncillaryData, IPCConnector, IPCConnectorKey, IPCEvent, RawAncillaryData, + INVALID_ANCILLARY_DATA, }; pub use crate::ipc_listener::{IPCListener, IPCListenerError}; -pub use crate::platform::{PlatformError, ProcessHandle}; +pub use crate::ipc_queue::IPCQueue; +pub use crate::platform::ProcessHandle; #[cfg(target_os = "windows")] pub use crate::platform::server_addr; diff --git a/toolkit/crashreporter/crash_helper_common/src/platform/windows.rs b/toolkit/crashreporter/crash_helper_common/src/platform/windows.rs @@ -5,15 +5,17 @@ use crate::{Pid, IO_TIMEOUT}; use std::{ ffi::CString, - mem::zeroed, - os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, + mem::{zeroed, MaybeUninit}, + os::windows::io::{ + AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, OwnedHandle, RawHandle, + }, ptr::{null, null_mut}, rc::Rc, }; use thiserror::Error; use windows_sys::Win32::{ Foundation::{ - GetLastError, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_PIPE_CONNECTED, + GetLastError, ERROR_BROKEN_PIPE, ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_PIPE_CONNECTED, FALSE, HANDLE, WAIT_TIMEOUT, WIN32_ERROR, }, Storage::FileSystem::{ReadFile, WriteFile}, @@ -30,6 +32,8 @@ pub type ProcessHandle = OwnedHandle; pub enum PlatformError { #[error("Could not accept incoming connection: {0}")] AcceptFailed(WIN32_ERROR), + #[error("Broken pipe")] + BrokenPipe, #[error("Failed to duplicate clone handle")] CloneHandleFailed(#[source] std::io::Error), #[error("Could not create event: {0}")] @@ -83,29 +87,29 @@ pub(crate) fn create_manual_reset_event() -> Result<OwnedHandle, PlatformError> Ok(unsafe { OwnedHandle::from_raw_handle(raw_handle) }) } -fn set_event(handle: HANDLE) -> Result<(), PlatformError> { +fn set_event(handle: BorrowedHandle) -> Result<(), PlatformError> { // SAFETY: This is always safe, even when passing an invalid handle. - if unsafe { SetEvent(handle) } == FALSE { + if unsafe { SetEvent(handle.as_raw_handle() as HANDLE) } == FALSE { Err(PlatformError::SetEventFailed(get_last_error())) } else { Ok(()) } } -fn reset_event(handle: HANDLE) -> Result<(), PlatformError> { +fn reset_event(handle: BorrowedHandle) -> Result<(), PlatformError> { // SAFETY: This is always safe, even when passing an invalid handle. - if unsafe { ResetEvent(handle) } == FALSE { + if unsafe { ResetEvent(handle.as_raw_handle() as HANDLE) } == FALSE { Err(PlatformError::ResetEventFailed(get_last_error())) } else { Ok(()) } } -fn cancel_overlapped_io(handle: HANDLE, overlapped: &mut OVERLAPPED) -> bool { +fn cancel_overlapped_io(handle: BorrowedHandle, overlapped: &OVERLAPPED) -> bool { // SAFETY: the pointer to the overlapped structure is always valid as the // structure is passed by reference. The handle should be valid but will // be handled properly in case it isn't. - let res = unsafe { CancelIoEx(handle, overlapped) }; + let res = unsafe { CancelIoEx(handle.as_raw_handle() as HANDLE, overlapped) }; if res == FALSE { if get_last_error() == ERROR_NOT_FOUND { // There was no pending operation @@ -115,14 +119,19 @@ fn cancel_overlapped_io(handle: HANDLE, overlapped: &mut OVERLAPPED) -> bool { return false; } + if overlapped.hEvent == 0 { + // No associated event, don't wait + return true; + } + // Just wait for the operation to finish, we don't care about the result - let mut number_of_bytes_transferred: u32 = 0; + let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); // SAFETY: Same as above let res = unsafe { GetOverlappedResultEx( - handle, + handle.as_raw_handle() as HANDLE, overlapped, - &mut number_of_bytes_transferred, + number_of_bytes_transferred.as_mut_ptr(), INFINITE, /* bAlertable */ FALSE, ) @@ -144,11 +153,8 @@ enum OverlappedOperationType { impl OverlappedOperation { // Asynchronously listen for an incoming connection - pub(crate) fn listen( - handle: &Rc<OwnedHandle>, - event: HANDLE, - ) -> Result<OverlappedOperation, PlatformError> { - let mut overlapped = Self::overlapped_with_event(event)?; + pub(crate) fn listen(handle: &Rc<OwnedHandle>) -> Result<OverlappedOperation, PlatformError> { + let mut overlapped = Self::overlapped(); // SAFETY: We guarantee that the handle and OVERLAPPED object are both // valid and remain so while used by this function. @@ -162,13 +168,12 @@ impl OverlappedOperation { return Err(PlatformError::ListenFailed(error)); } - if error == ERROR_PIPE_CONNECTED { - // The operation completed synchronously, set the event so that - // waiting on it will return immediately. - set_event(event)?; - } else if error != ERROR_IO_PENDING { - return Err(PlatformError::ListenFailed(error)); - } + match error { + ERROR_PIPE_CONNECTED | ERROR_IO_PENDING => { + // The operations succeeded, we'll get a completion event + } + error => return Err(PlatformError::ListenFailed(error)), + }; Ok(OverlappedOperation { handle: handle.clone(), @@ -179,29 +184,23 @@ impl OverlappedOperation { // Synchronously accept an incoming connection, does not wait and fails if // no incoming connection is present. - pub(crate) fn accept(mut self, handle: HANDLE) -> Result<(), PlatformError> { + pub(crate) fn accept(mut self) -> Result<(), PlatformError> { let overlapped = self.overlapped.take().unwrap(); - let mut _number_of_bytes_transferred: u32 = 0; + let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); // SAFETY: The pointer to the OVERLAPPED structure is under our // control and thus guaranteed to be valid. let res = unsafe { GetOverlappedResultEx( - handle, + self.handle.as_raw_handle() as HANDLE, overlapped.as_ref(), - &mut _number_of_bytes_transferred, + number_of_bytes_transferred.as_mut_ptr(), 0, /* bAlertable */ FALSE, ) }; if res == FALSE { - let error = get_last_error(); - if error == ERROR_IO_INCOMPLETE { - // The I/O operation did not complete yet - self.cancel_or_leak(overlapped, None); - } - - return Err(PlatformError::AcceptFailed(error)); + return Err(PlatformError::AcceptFailed(get_last_error())); } Ok(()) @@ -210,33 +209,38 @@ impl OverlappedOperation { fn await_io( mut self, optype: OverlappedOperationType, - wait: bool, ) -> Result<Option<Vec<u8>>, PlatformError> { let overlapped = self.overlapped.take().unwrap(); let buffer = self.buffer.take().unwrap(); - let mut number_of_bytes_transferred: u32 = 0; + let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); // SAFETY: All the pointers passed to this call are under our control // and thus guaranteed to be valid. let res = unsafe { GetOverlappedResultEx( self.handle.as_raw_handle() as HANDLE, overlapped.as_ref(), - &mut number_of_bytes_transferred, - if wait { IO_TIMEOUT as u32 } else { 0 }, + number_of_bytes_transferred.as_mut_ptr(), + IO_TIMEOUT as u32, /* bAlertable */ FALSE, ) }; if res == FALSE { let error = get_last_error(); - if (wait && (error == WAIT_TIMEOUT)) || (!wait && (error == ERROR_IO_INCOMPLETE)) { + if error == WAIT_TIMEOUT { // The I/O operation did not complete yet self.cancel_or_leak(overlapped, Some(buffer)); + } else if error == ERROR_BROKEN_PIPE { + return Err(PlatformError::BrokenPipe); } return Err(PlatformError::IOError(error)); } + // SAFETY: We've verified that `number_of_bytes_transferred` has been + // populated by the `GetOverlappedResultEx()` call. + let number_of_bytes_transferred = unsafe { number_of_bytes_transferred.assume_init() }; + if number_of_bytes_transferred as usize != buffer.len() { return Err(match optype { OverlappedOperationType::Read => PlatformError::ReceiveTooShort { @@ -256,12 +260,16 @@ impl OverlappedOperation { }) } - pub(crate) fn sched_recv( + fn sched_recv_internal( handle: &Rc<OwnedHandle>, - event: HANDLE, + event: Option<BorrowedHandle>, expected_size: usize, ) -> Result<OverlappedOperation, PlatformError> { - let mut overlapped = Self::overlapped_with_event(event)?; + let mut overlapped = if let Some(event) = event { + OverlappedOperation::overlapped_with_event(event)? + } else { + OverlappedOperation::overlapped() + }; let mut buffer = vec![0u8; expected_size]; let number_of_bytes_to_read: u32 = expected_size .try_into() @@ -281,9 +289,13 @@ impl OverlappedOperation { let error = get_last_error(); if res != FALSE { - // The operation completed synchronously, set the event so that - // waiting on it will return immediately. - set_event(event)?; + if let Some(event) = event { + // The operation completed synchronously, if we have an event + // set it so that waiting on it will return immediately. + set_event(event)?; + } + } else if error == ERROR_BROKEN_PIPE { + return Err(PlatformError::BrokenPipe); } else if error != ERROR_IO_PENDING { return Err(PlatformError::IOError(error)); } @@ -295,15 +307,33 @@ impl OverlappedOperation { }) } - pub(crate) fn collect_recv(self, wait: bool) -> Result<Vec<u8>, PlatformError> { - Ok(self.await_io(OverlappedOperationType::Read, wait)?.unwrap()) + pub(crate) fn recv( + handle: &Rc<OwnedHandle>, + event: BorrowedHandle<'_>, + expected_size: usize, + ) -> Result<Vec<u8>, PlatformError> { + let overlapped = Self::sched_recv_internal(handle, Some(event), expected_size)?; + overlapped + .await_io(OverlappedOperationType::Read) + .map(|buffer| buffer.unwrap()) } - pub(crate) fn sched_send( + pub(crate) fn sched_recv( handle: &Rc<OwnedHandle>, - event: HANDLE, - mut buffer: Vec<u8>, + expected_size: usize, ) -> Result<OverlappedOperation, PlatformError> { + Self::sched_recv_internal(handle, None, expected_size) + } + + pub(crate) fn collect_recv(mut self) -> Vec<u8> { + self.buffer.take().expect("Missing receive buffer") + } + + pub(crate) fn send( + handle: &Rc<OwnedHandle>, + event: BorrowedHandle<'_>, + mut buffer: Vec<u8>, + ) -> Result<(), PlatformError> { let mut overlapped = Self::overlapped_with_event(event)?; let number_of_bytes_to_write: u32 = buffer .len() @@ -327,33 +357,64 @@ impl OverlappedOperation { // The operation completed synchronously, set the event so that // waiting on it will return immediately. set_event(event)?; + } else if error == ERROR_BROKEN_PIPE { + return Err(PlatformError::BrokenPipe); } else if error != ERROR_IO_PENDING { return Err(PlatformError::IOError(error)); } - Ok(OverlappedOperation { + let overlapped = OverlappedOperation { handle: handle.clone(), overlapped: Some(overlapped), buffer: Some(buffer), - }) - } + }; - pub(crate) fn complete_send(self, wait: bool) -> Result<(), PlatformError> { - self.await_io(OverlappedOperationType::Write, wait)?; - Ok(()) + overlapped + .await_io(OverlappedOperationType::Write) + .map(|buffer| { + debug_assert!(buffer.is_none()); + }) } - fn overlapped_with_event(event: HANDLE) -> Result<Box<OVERLAPPED>, PlatformError> { + fn overlapped_with_event(event: BorrowedHandle<'_>) -> Result<Box<OVERLAPPED>, PlatformError> { reset_event(event)?; + // We set the last bit of the `hEvent` field to prevent this overlapped + // operation from generating completion events. The event handle will + // be notified instead when it completes. Ok(Box::new(OVERLAPPED { - hEvent: event, + hEvent: event.as_raw_handle() as HANDLE | 1, ..unsafe { zeroed() } })) } + fn overlapped() -> Box<OVERLAPPED> { + Box::new(unsafe { zeroed() }) + } + + /// Cancel the pending operation but leave the buffers intact. It's the + /// caller's responsibility to wait for the operation to complete and free + /// the buffers. + pub(crate) fn cancel(&self) -> bool { + if let Some(overlapped) = self.overlapped.as_deref() { + return cancel_overlapped_io(self.handle.as_handle(), overlapped); + } + + true + } + + /// Leak the buffers involved in the operation. + pub(crate) fn leak(&mut self) { + if let Some(overlapped) = self.overlapped.take() { + Box::leak(overlapped); + if let Some(buffer) = self.buffer.take() { + buffer.leak(); + } + } + } + fn cancel_or_leak(&self, mut overlapped: Box<OVERLAPPED>, buffer: Option<Vec<u8>>) { - if !cancel_overlapped_io(self.handle.as_raw_handle() as HANDLE, overlapped.as_mut()) { + if !cancel_overlapped_io(self.handle.as_handle(), overlapped.as_mut()) { // If we cannot cancel the operation we must leak the // associated buffers so that they're available in case it // ever completes. @@ -370,6 +431,10 @@ impl Drop for OverlappedOperation { let overlapped = self.overlapped.take(); let buffer = self.buffer.take(); if let Some(overlapped) = overlapped { + if overlapped.hEvent == 0 { + return; // This operation should have already been cancelled. + } + self.cancel_or_leak(overlapped, buffer); } } diff --git a/toolkit/crashreporter/crash_helper_server/src/ipc_server.rs b/toolkit/crashreporter/crash_helper_server/src/ipc_server.rs @@ -3,15 +3,13 @@ * You can obtain one at http://mozilla.org/MPL/2.0/. */ use anyhow::Result; -use crash_helper_common::{errors::IPCError, messages, IPCConnector, IPCEvent, IPCListener}; +use crash_helper_common::{ + messages::Header, AncillaryData, IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, IPCQueue, +}; +use std::{collections::HashMap, rc::Rc}; use crate::crash_generation::{CrashGenerator, MessageResult}; -#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] -mod unix; -#[cfg(target_os = "windows")] -mod windows; - #[derive(PartialEq)] pub enum IPCServerState { Running, @@ -21,61 +19,73 @@ pub enum IPCServerState { #[derive(PartialEq)] enum IPCEndpoint { Parent, // A connection to the parent process - #[allow(dead_code)] + Child, // A connection to the child process + #[allow(dead_code)] External, // A connection to an external process } struct IPCConnection { - connector: IPCConnector, + connector: Rc<IPCConnector>, endpoint: IPCEndpoint, } pub(crate) struct IPCServer { - #[cfg_attr(unix, allow(dead_code))] - listener: IPCListener, - connections: Vec<IPCConnection>, + /// Platform-specific mechanism to wait for events. This will contain + /// references to the connectors so needs to be the first element in + /// the structure so that it's dropped first. + queue: IPCQueue, + connections: HashMap<IPCConnectorKey, IPCConnection>, } impl IPCServer { - pub(crate) fn new(listener: IPCListener, connector: IPCConnector) -> IPCServer { - IPCServer { - listener, - connections: vec![IPCConnection { + pub(crate) fn new(listener: IPCListener, connector: IPCConnector) -> Result<IPCServer> { + let connector = Rc::new(connector); + let mut queue = IPCQueue::new(listener)?; + queue.add_connector(&connector)?; + + let mut connections = HashMap::with_capacity(10); + connections.insert( + connector.key(), + IPCConnection { connector, endpoint: IPCEndpoint::Parent, - }], - } + }, + ); + + Ok(IPCServer { queue, connections }) } - pub(crate) fn run( - &mut self, - generator: &mut CrashGenerator, - ) -> Result<IPCServerState, IPCError> { - let events = self.wait_for_events()?; + pub(crate) fn run(&mut self, generator: &mut CrashGenerator) -> Result<IPCServerState> { + let events = self.queue.wait_for_events()?; - // We reverse the order of events, so that we start processing them - // from the highest indexes toward the lowest. If we did the opposite - // removed connections would invalidate the successive indexes. - for event in events.into_iter().rev() { + for event in events.into_iter() { match event { IPCEvent::Connect(connector) => { - self.connections.push(IPCConnection { - connector, - endpoint: IPCEndpoint::External, - }); + self.connections.insert( + connector.key(), + IPCConnection { + connector, + endpoint: IPCEndpoint::External, + }, + ); } - IPCEvent::Header(index, header) => { - let res = self.handle_message(index, &header, generator); - if let Err(error) = res { + IPCEvent::Message(key, header, payload, ancillary_data) => { + if let Err(error) = + self.handle_message(key, &header, payload, ancillary_data, generator) + { log::error!( - "Error {error} while handling a message of {:?} kind", + "Error {error} when handling a message of kind {:?}", header.kind ); } } - IPCEvent::Disconnect(index) => { - let connection = self.connections.remove(index); + IPCEvent::Disconnect(key) => { + let connection = self + .connections + .remove(&key) + .expect("Disconnection event but no corresponding connection"); + if connection.endpoint == IPCEndpoint::Parent { // The main process disconnected, leave return Ok(IPCServerState::ClientDisconnected); @@ -89,37 +99,41 @@ impl IPCServer { fn handle_message( &mut self, - index: usize, - header: &messages::Header, + key: IPCConnectorKey, + header: &Header, + data: Vec<u8>, + ancillary_data: Option<AncillaryData>, generator: &mut CrashGenerator, ) -> Result<()> { let connection = self .connections - .get_mut(index) - .expect("Invalid connector index"); - let connector = &mut connection.connector; - let (data, ancillary_data) = connector.recv(header.size)?; + .get(&key) + .expect("Event received on non-existing connection"); + let connector = &connection.connector; - let connection = match connection.endpoint { + match connection.endpoint { IPCEndpoint::Parent => { - generator.parent_message(connector, header.kind, &data, ancillary_data) + let res = + generator.parent_message(connector, header.kind, &data, ancillary_data)?; + if let MessageResult::Connection(connector) = res { + let connector = Rc::new(connector); + self.queue.add_connector(&connector)?; + self.connections.insert( + connector.key(), + IPCConnection { + connector, + endpoint: IPCEndpoint::Child, + }, + ); + } } - IPCEndpoint::Child => generator.child_message(header.kind, &data, ancillary_data), - IPCEndpoint::External => { - generator.external_message(connector, header.kind, &data, ancillary_data) + IPCEndpoint::Child => { + generator.child_message(header.kind, &data, ancillary_data)?; } - }?; - - match connection { - MessageResult::Connection(connector) => { - self.connections.push(IPCConnection { - connector, - endpoint: IPCEndpoint::Child, - }); + IPCEndpoint::External => { + generator.external_message(connector, header.kind, &data, ancillary_data)?; } - - MessageResult::None => {} - } + }; Ok(()) } diff --git a/toolkit/crashreporter/crash_helper_server/src/ipc_server/unix.rs b/toolkit/crashreporter/crash_helper_server/src/ipc_server/unix.rs @@ -1,65 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this file, - * You can obtain one at http://mozilla.org/MPL/2.0/. */ - -use crash_helper_common::{errors::IPCError, ignore_eintr, IPCEvent}; -use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; - -use super::IPCServer; - -impl IPCServer { - pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCError> { - let mut pollfds = Vec::with_capacity(self.connections.len()); - pollfds.extend( - self.connections.iter().map(|connection| { - PollFd::new(connection.connector.as_raw_ref(), PollFlags::POLLIN) - }), - ); - - let mut events = Vec::<IPCEvent>::new(); - let mut num_events = ignore_eintr!(poll(&mut pollfds, PollTimeout::NONE)) - .map_err(|errno| IPCError::WaitingFailure(Some(errno)))?; - - for (index, pollfd) in pollfds.iter().enumerate() { - // revents() returns None only if the kernel sends back data - // that nix does not understand, we can safely assume this - // never happens in practice hence the unwrap(). - let revents = pollfd.revents().unwrap(); - - if revents.contains(PollFlags::POLLHUP) { - events.push(IPCEvent::Disconnect(index)); - // If a process was disconnected then skip all further - // processing of the socket. This wouldn't matter normally, - // but on macOS calling recvmsg() on a hung-up socket seems - // to trigger a kernel panic, one we've already encountered - // in the past. Doing things this way avoids the panic - // while having no real downsides. - continue; - } - - if revents.contains(PollFlags::POLLIN) { - // SAFETY: The index is guaranteed to be >0 and within - // the bounds of the connections array. - let connection = unsafe { self.connections.get_unchecked(index) }; - let header = connection.connector.recv_header(); - if let Ok(header) = header { - // Note that if we encounter a failure we don't propagate - // it, when the socket gets disconnected we'll get a - // POLLHUP event anyway so deal with disconnections there - // instead of here. - events.push(IPCEvent::Header(index, header)); - } - } - - if !revents.is_empty() { - num_events -= 1; - - if num_events == 0 { - break; - } - } - } - - Ok(events) - } -} diff --git a/toolkit/crashreporter/crash_helper_server/src/ipc_server/windows.rs b/toolkit/crashreporter/crash_helper_server/src/ipc_server/windows.rs @@ -1,98 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this file, - * You can obtain one at http://mozilla.org/MPL/2.0/. */ - -use std::convert::TryInto; - -use crash_helper_common::{errors::IPCError, IPCEvent, PlatformError}; -use log::error; -use windows_sys::Win32::{ - Foundation::{ERROR_BROKEN_PIPE, FALSE, HANDLE, WAIT_OBJECT_0}, - System::{ - SystemServices::MAXIMUM_WAIT_OBJECTS, - Threading::{WaitForMultipleObjects, INFINITE}, - }, -}; - -use super::IPCServer; - -impl IPCServer { - pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCError> { - for connection in self.connections.iter_mut() { - // TODO: We might get a broken pipe error here which would cause us to - // fail instead of just dropping the disconnected connection. - connection.connector.sched_recv_header()?; - } - - let native_events = self.collect_events(); - - // SAFETY: This is less than MAXIMUM_WAIT_OBJECTS - let native_events_len: u32 = unsafe { native_events.len().try_into().unwrap_unchecked() }; - - let res = unsafe { - WaitForMultipleObjects( - native_events_len, - native_events.as_ptr(), - FALSE, // bWaitAll - INFINITE, - ) - }; - - if res >= (WAIT_OBJECT_0 + native_events_len) { - return Err(IPCError::WaitingFailure(None)); - } - - let index = (res - WAIT_OBJECT_0) as usize; - - let mut events = Vec::<IPCEvent>::new(); - if index == 0 { - if let Ok(connector) = self.listener.accept() { - events.push(IPCEvent::Connect(connector)); - } - } else { - let index = index - 1; - // SAFETY: The index is guaranteed to be within the bounds of the connections array. - let connection = unsafe { self.connections.get_unchecked_mut(index) }; - let header = connection.connector.collect_header(); - - match header { - Ok(header) => { - events.push(IPCEvent::Header(index, header)); - } - Err(error) => match error { - IPCError::ReceptionFailure( - _error @ PlatformError::IOError(ERROR_BROKEN_PIPE), - ) => { - events.push(IPCEvent::Disconnect(index)); - } - _ => return Err(error), - }, - } - } - - Ok(events) - } - - /// This currently returns a vector that is no longer than - /// `MAXIMUM_WAIT_OBJECTS`, so its contents can be safely passed to - /// a `WaitForMultipleObjects()` call. - fn collect_events(&self) -> Vec<HANDLE> { - let mut events = Vec::with_capacity(1 + self.connections.len()); - - events.push(self.listener.event_raw_handle()); - for connection in self.connections.iter() { - events.push(connection.connector.event_raw_handle()); - } - - // HACK: When we hit this limit we should be splitting this list in - // multiple groups of at most MAXIMUM_WAIT_OBJECTS objects and have - // several threads wait on the groups, then wait on the threads - // themselves. - if events.len() > MAXIMUM_WAIT_OBJECTS.try_into().unwrap() { - error!("More than {MAXIMUM_WAIT_OBJECTS} processes connecting to the crash helper"); - events.truncate(MAXIMUM_WAIT_OBJECTS.try_into().unwrap()); - } - - events - } -} diff --git a/toolkit/crashreporter/crash_helper_server/src/lib.rs b/toolkit/crashreporter/crash_helper_server/src/lib.rs @@ -16,7 +16,10 @@ use crash_helper_common::Pid; #[cfg(target_os = "android")] use crash_helper_common::RawAncillaryData; use crash_helper_common::{BreakpadData, BreakpadRawData, IPCConnector, IPCListener}; -use std::ffi::{c_char, CStr, OsString}; +use std::{ + ffi::{c_char, CStr, OsString}, + fmt::Display, +}; use crash_generation::CrashGenerator; use ipc_server::{IPCServer, IPCServerState}; @@ -52,26 +55,25 @@ pub unsafe extern "C" fn crash_generator_logic_desktop( .unwrap(); let minidump_path = OsString::from(minidump_path); let listener = unsafe { CStr::from_ptr(listener) }; - let listener = IPCListener::deserialize(listener, client_pid) - .map_err(|error| { - log::error!("Could not parse the crash generator's listener (error: {error})"); - }) - .unwrap(); + let listener = unwrap_with_message( + IPCListener::deserialize(listener, client_pid), + "Could not parse the crash generator's listener", + ); let pipe = unsafe { CStr::from_ptr(pipe) }; - let connector = IPCConnector::deserialize(pipe) - .map_err(|error| { - log::error!("Could not parse the crash generator's connector (error: {error})"); - }) - .unwrap(); + let connector = unwrap_with_message( + IPCConnector::deserialize(pipe), + "Could not parse the crash generator's connector", + ); - let crash_generator = CrashGenerator::new(breakpad_data, minidump_path) - .map_err(|error| { - log::error!("Could not create the crash generator (error: {error})"); - error - }) - .unwrap(); + let crash_generator = unwrap_with_message( + CrashGenerator::new(breakpad_data, minidump_path), + "Could not create the crash generator", + ); - let ipc_server = IPCServer::new(listener, connector); + let ipc_server = unwrap_with_message( + IPCServer::new(listener, connector), + "Could not create the IPC server", + ); main_loop(ipc_server, crash_generator) } @@ -102,26 +104,28 @@ pub unsafe extern "C" fn crash_generator_logic_android( .into_string() .unwrap(); let minidump_path = OsString::from(minidump_path); - let crash_generator = CrashGenerator::new(breakpad_data, minidump_path) - .map_err(|error| { - log::error!("Could not create the crash generator (error: {error})"); - error - }) - .unwrap(); - - let listener = IPCListener::new(0).unwrap(); - // SAFETY: The `pipe` file descriptor passed in from the caller is - // guaranteed to be valid. - let connector = unsafe { IPCConnector::from_raw_ancillary(pipe) } - .map_err(|error| { - log::error!("Could not use the pipe (error: {error})"); - }) - .unwrap(); - let ipc_server = IPCServer::new(listener, connector); // On Android the main thread is used to respond to the intents so we // can't block it. Run the crash generation loop in a separate thread. - let _ = std::thread::spawn(move || main_loop(ipc_server, crash_generator)); + let _ = std::thread::spawn(move || { + let crash_generator = unwrap_with_message( + CrashGenerator::new(breakpad_data, minidump_path), + "Could not create the crash generator", + ); + + let listener = IPCListener::new(0).unwrap(); + // SAFETY: The `pipe` file descriptor passed in from the caller is + // guaranteed to be valid. + let connector = unwrap_with_message( + unsafe { IPCConnector::from_raw_ancillary(pipe) }, + "Could not use the pipe", + ); + let ipc_server = unwrap_with_message( + IPCServer::new(listener, connector), + "Could not create the IPC server", + ); + main_loop(ipc_server, crash_generator) + }); } fn main_loop(mut ipc_server: IPCServer, mut crash_generator: CrashGenerator) -> i32 { @@ -179,3 +183,13 @@ fn daemonize() { } } } + +fn unwrap_with_message<T, E: Display>(res: Result<T, E>, error_string: &str) -> T { + match res { + Ok(value) => value, + Err(error) => { + log::error!("{error_string} (error: {error})"); + panic!("{} (error: {})", error_string, error); + } + } +}