commit 5f0c529213a77608bc51c18daeba638a0ce78919 parent a775b2c4830edf8bbafdf9637959af0e228ecb98 Author: agoloman <agoloman@mozilla.com> Date: Fri, 21 Nov 2025 22:42:43 +0200 Revert "Bug 1989686 - Switched the crash helper IPC code to use I/O completion ports on Windows r=afranchuk" for causing bc failures @browser_ext_port_disconnect_on_crash.js. This reverts commit b6d948b3ef18a270db36fc48ec48a2a9acaa1b98. Revert "Bug 1989686 - Refactor errors in per-type groups r=afranchuk" This reverts commit d1a18f4c23a54b7d2872c92de1fb0b6def7d1b1b. Revert "Bug 1989686 - Stop duplicating handles when scheduling overlapped operations r=afranchuk" This reverts commit 6e574d3b53f8d8a125c77209e4477d906aa0c2d8. Diffstat:
27 files changed, 607 insertions(+), 1119 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock @@ -1166,7 +1166,6 @@ dependencies = [ name = "crash_helper_common" version = "0.1.0" dependencies = [ - "getrandom 0.3.3", "minidump-writer", "nix 0.30.1", "num-derive", diff --git a/toolkit/crashreporter/crash_helper_client/src/lib.rs b/toolkit/crashreporter/crash_helper_client/src/lib.rs @@ -83,6 +83,10 @@ impl CrashHelperClient { let message = messages::TransferMinidump::new(pid); self.connector.send_message(message)?; + // HACK: Workaround for a macOS-specific bug + #[cfg(target_os = "macos")] + self.connector.poll(nix::poll::PollFlags::POLLIN)?; + let reply = self .connector .recv_reply::<messages::TransferMinidumpReply>()?; diff --git a/toolkit/crashreporter/crash_helper_common/Cargo.toml b/toolkit/crashreporter/crash_helper_common/Cargo.toml @@ -17,7 +17,6 @@ nix = { version = "0.30", features = ["fs", "poll", "socket", "uio"] } minidump-writer = "0.10" [target."cfg(target_os = \"windows\")".dependencies] -getrandom = { version = "0.3" } windows-sys = { version = "0.52", features = [ "Win32_Foundation", "Win32_Security", diff --git a/toolkit/crashreporter/crash_helper_common/src/breakpad/unix_strings.rs b/toolkit/crashreporter/crash_helper_common/src/breakpad/unix_strings.rs @@ -9,7 +9,7 @@ use std::{ os::unix::ffi::OsStringExt, }; -use crate::{messages::MessageError, BreakpadString}; +use crate::{errors::MessageError, BreakpadString}; use super::BreakpadChar; diff --git a/toolkit/crashreporter/crash_helper_common/src/breakpad/windows_strings.rs b/toolkit/crashreporter/crash_helper_common/src/breakpad/windows_strings.rs @@ -9,7 +9,7 @@ use std::{ os::windows::ffi::{OsStrExt, OsStringExt}, }; -use crate::{messages::MessageError, BreakpadChar, BreakpadString}; +use crate::{errors::MessageError, BreakpadChar, BreakpadString}; // BreakpadString trait implementation for Windows native UTF-16 strings impl BreakpadString for OsString { diff --git a/toolkit/crashreporter/crash_helper_common/src/errors.rs b/toolkit/crashreporter/crash_helper_common/src/errors.rs @@ -2,37 +2,63 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::num::TryFromIntError; +use std::{ + array::TryFromSliceError, + ffi::{FromBytesWithNulError, NulError}, + num::TryFromIntError, +}; use thiserror::Error; #[cfg(not(target_os = "windows"))] -pub use nix::errno::Errno as SystemError; +use nix::errno::Errno as SystemError; #[cfg(target_os = "windows")] -pub use windows_sys::Win32::Foundation::WIN32_ERROR as SystemError; - -use crate::{ - messages::{self, MessageError}, - platform::PlatformError, -}; +use windows_sys::Win32::Foundation::WIN32_ERROR as SystemError; #[derive(Debug, Error)] pub enum IPCError { #[error("Message error")] BadMessage(#[from] MessageError), - #[error("Could not connect to a socket: {0}")] + #[error("Generic system error: {0}")] + System(SystemError), + #[error("Could not bind socket to an address, error: {0}")] + BindFailed(SystemError), + #[error("Could not listen on a socket, error: {0}")] + ListenFailed(SystemError), + #[error("Could not accept an incoming connection, error: {0}")] + AcceptFailed(SystemError), + #[error("Could not connect to a socket, error: {0}")] ConnectionFailure(SystemError), - #[error("Failed to create a connector: {0}")] - CreationFailure(PlatformError), + #[error("Could not send data, error: {0}")] + TransmissionFailure(SystemError), + #[error("Could not receive data, error: {0}")] + ReceptionFailure(SystemError), + #[error("Error while waiting for events, error: {0:?}")] + WaitingFailure(Option<SystemError>), #[error("Buffer length exceeds a 32-bit integer")] InvalidSize(#[from] TryFromIntError), #[error("Error while parsing a file descriptor string")] ParseError, - #[error("Could not receive data: {0}")] - ReceptionFailure(PlatformError), - #[error("An operation timed out")] - Timeout, - #[error("Could not send data: {0}")] - TransmissionFailure(PlatformError), - #[error("Unexpected message of kind: {0:?}")] - UnexpectedMessage(messages::Kind), + #[error("Failed to duplicate clone handle")] + CloneHandleFailed(#[source] std::io::Error), + #[cfg(target_os = "windows")] + #[error("Missing destination process handle")] + MissingProcessHandle, +} + +#[derive(Debug, Error)] +pub enum MessageError { + #[error("Truncated message")] + Truncated, + #[error("Message kind is invalid")] + InvalidKind, + #[error("The message contained an invalid payload")] + InvalidData, + #[error("Missing ancillary data")] + MissingAncillary, + #[error("Invalid message size")] + InvalidSize(#[from] TryFromSliceError), + #[error("Missing nul terminator")] + MissingNul(#[from] FromBytesWithNulError), + #[error("Missing nul terminator")] + InteriorNul(#[from] NulError), } diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_channel.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_channel.rs @@ -2,24 +2,6 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use thiserror::Error; - -use crate::{errors::IPCError, platform::PlatformError, IPCListenerError}; - -/***************************************************************************** - * Error definitions * - *****************************************************************************/ - -#[derive(Debug, Error)] -pub enum IPCChannelError { - #[error("Could not create connector: {0}")] - Connector(#[from] IPCError), - #[error("Could not create a listener: {0}")] - Listener(#[from] IPCListenerError), - #[error("Could not create a socketpair: {0}")] - SocketPair(#[from] PlatformError), -} - /***************************************************************************** * Windows * *****************************************************************************/ diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_channel/unix.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_channel/unix.rs @@ -8,7 +8,7 @@ use std::process; use crate::platform::linux::unix_socketpair; #[cfg(target_os = "macos")] use crate::platform::macos::unix_socketpair; -use crate::{ipc_channel::IPCChannelError, IPCConnector, IPCListener, Pid}; +use crate::{errors::IPCError, IPCConnector, IPCListener, Pid}; pub struct IPCChannel { listener: IPCListener, @@ -21,11 +21,11 @@ impl IPCChannel { /// will use the current process PID as part of its address and two /// connected endpoints. The listener and the server-side endpoint can be /// inherited by a child process, the client-side endpoint cannot. - pub fn new() -> Result<IPCChannel, IPCChannelError> { + pub fn new() -> Result<IPCChannel, IPCError> { let listener = IPCListener::new(process::id() as Pid)?; // Only the server-side socket will be left open after an exec(). - let pair = unix_socketpair().map_err(IPCChannelError::SocketPair)?; + let pair = unix_socketpair().map_err(IPCError::System)?; let client_endpoint = IPCConnector::from_fd(pair.0)?; let server_endpoint = IPCConnector::from_fd_inheritable(pair.1)?; @@ -52,8 +52,8 @@ pub struct IPCClientChannel { impl IPCClientChannel { /// Create a new IPC channel for use between one of the browser's child /// processes and the crash helper. - pub fn new() -> Result<IPCClientChannel, IPCChannelError> { - let pair = unix_socketpair().map_err(IPCChannelError::SocketPair)?; + pub fn new() -> Result<IPCClientChannel, IPCError> { + let pair = unix_socketpair().map_err(IPCError::System)?; let client_endpoint = IPCConnector::from_fd(pair.0)?; let server_endpoint = IPCConnector::from_fd(pair.1)?; diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_channel/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_channel/windows.rs @@ -2,14 +2,13 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::{ffi::CString, process}; +use std::{ffi::CString, hash::RandomState, process}; -use windows_sys::Win32::Foundation::{ERROR_ACCESS_DENIED, ERROR_ADDRESS_ALREADY_ASSOCIATED}; +use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; use crate::{ - ipc_channel::IPCChannelError, - ipc_listener::IPCListenerError, - platform::windows::{server_addr, PlatformError}, + errors::IPCError, + platform::windows::{get_last_error, server_addr}, IPCConnector, IPCListener, Pid, }; @@ -23,7 +22,7 @@ impl IPCChannel { /// Create a new IPCChannel, this includes a listening endpoint that /// will use the current process PID as part of its address and two /// connected endpoints. - pub fn new() -> Result<IPCChannel, IPCChannelError> { + pub fn new() -> Result<IPCChannel, IPCError> { let pid = process::id() as Pid; let mut listener = IPCListener::new(server_addr(pid))?; listener.listen()?; @@ -53,7 +52,7 @@ pub struct IPCClientChannel { impl IPCClientChannel { /// Create a new IPC channel for use between one of the browser's child /// processes and the crash helper. - pub fn new() -> Result<IPCClientChannel, IPCChannelError> { + pub fn new() -> Result<IPCClientChannel, IPCError> { let mut listener = Self::create_listener()?; listener.listen()?; let client_endpoint = IPCConnector::connect(listener.address())?; @@ -65,15 +64,14 @@ impl IPCClientChannel { }) } - fn create_listener() -> Result<IPCListener, IPCListenerError> { + fn create_listener() -> Result<IPCListener, IPCError> { const ATTEMPTS: u32 = 5; // We pick the listener name at random, as unlikely as it may be there // could be clashes so try a few times before giving up. for _i in 0..ATTEMPTS { - let Ok(random_id) = getrandom::u64() else { - continue; - }; + use std::hash::{BuildHasher, Hasher}; + let random_id = RandomState::new().build_hasher().finish(); let pipe_name = CString::new(format!( "\\\\.\\pipe\\gecko-crash-helper-child-pipe.{random_id:}" @@ -81,19 +79,14 @@ impl IPCClientChannel { .unwrap(); match IPCListener::new(pipe_name) { Ok(listener) => return Ok(listener), - Err( - _error @ IPCListenerError::CreationError(PlatformError::CreatePipeFailure( - ERROR_ACCESS_DENIED, - )), - ) => {} // Try again + Err(_error @ IPCError::System(ERROR_ACCESS_DENIED)) => {} // Try again Err(error) => return Err(error), } } - // If we got to this point give up. - Err(IPCListenerError::CreationError( - PlatformError::CreatePipeFailure(ERROR_ADDRESS_ALREADY_ASSOCIATED), - )) + // If we got to this point return whatever was the last error we + // encountered along the way. + Err(IPCError::System(get_last_error())) } /// Deconstruct the IPC channel, returning the listening endpoint, diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_connector.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_connector.rs @@ -2,14 +2,12 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::rc::Rc; - use crate::messages::Header; pub enum IPCEvent { - Connect(Rc<IPCConnector>), - Message(IPCConnectorKey, Header, Vec<u8>, Option<AncillaryData>), - Disconnect(IPCConnectorKey), + Connect(IPCConnector), + Header(usize, Header), + Disconnect(usize), } /***************************************************************************** @@ -17,9 +15,7 @@ pub enum IPCEvent { *****************************************************************************/ #[cfg(target_os = "windows")] -pub use windows::{ - AncillaryData, IPCConnector, IPCConnectorKey, RawAncillaryData, INVALID_ANCILLARY_DATA, -}; +pub use windows::{AncillaryData, IPCConnector, RawAncillaryData, INVALID_ANCILLARY_DATA}; #[cfg(target_os = "windows")] pub(crate) mod windows; @@ -29,9 +25,7 @@ pub(crate) mod windows; *****************************************************************************/ #[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] -pub use unix::{ - AncillaryData, IPCConnector, IPCConnectorKey, RawAncillaryData, INVALID_ANCILLARY_DATA, -}; +pub use unix::{AncillaryData, IPCConnector, RawAncillaryData, INVALID_ANCILLARY_DATA}; #[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] pub(crate) mod unix; diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_connector/unix.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_connector/unix.rs @@ -6,9 +6,7 @@ use crate::platform::linux::{set_socket_cloexec, set_socket_default_flags}; #[cfg(target_os = "macos")] use crate::platform::macos::{set_socket_cloexec, set_socket_default_flags}; -use crate::{ - ignore_eintr, platform::PlatformError, IntoRawAncillaryData, ProcessHandle, IO_TIMEOUT, -}; +use crate::{ignore_eintr, IntoRawAncillaryData, ProcessHandle, IO_TIMEOUT}; use nix::{ cmsg_space, @@ -19,7 +17,7 @@ use nix::{ use std::{ ffi::{CStr, CString}, io::{IoSlice, IoSliceMut}, - os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, str::FromStr, }; @@ -40,8 +38,6 @@ impl IntoRawAncillaryData for AncillaryData { // This must match `kInvalidHandle` in `mfbt/UniquePtrExt.h` pub const INVALID_ANCILLARY_DATA: RawAncillaryData = -1; -pub type IPCConnectorKey = RawFd; - pub struct IPCConnector { socket: OwnedFd, } @@ -52,7 +48,7 @@ impl IPCConnector { /// will not be possible to inerhit this connector in a child process. pub(crate) fn from_fd(socket: OwnedFd) -> Result<IPCConnector, IPCError> { let connector = IPCConnector::from_fd_inheritable(socket)?; - set_socket_cloexec(connector.socket.as_fd()).map_err(IPCError::CreationFailure)?; + set_socket_cloexec(connector.socket.as_fd()).map_err(IPCError::System)?; Ok(connector) } @@ -60,7 +56,7 @@ impl IPCConnector { /// `FD_CLOEXEC` flag will not be set on the underlying socket and thus it /// will be possible to inherit this connector in a child process. pub(crate) fn from_fd_inheritable(socket: OwnedFd) -> Result<IPCConnector, IPCError> { - set_socket_default_flags(socket.as_fd()).map_err(IPCError::CreationFailure)?; + set_socket_default_flags(socket.as_fd()).map_err(IPCError::System)?; Ok(IPCConnector { socket }) } @@ -86,7 +82,7 @@ impl IPCConnector { /// command-line to a child process. This only works for newly /// created connectors because they are explicitly created as inheritable. pub fn serialize(&self) -> CString { - CString::new(self.as_raw().to_string()).unwrap() + CString::new(self.socket.as_raw_fd().to_string()).unwrap() } /// Deserialize a connector from an argument passed on the command-line. @@ -106,23 +102,19 @@ impl IPCConnector { self.socket.into_raw() } - pub(crate) fn as_raw(&self) -> RawFd { - self.socket.as_raw_fd() - } - - pub fn key(&self) -> IPCConnectorKey { - self.socket.as_raw_fd() + pub fn as_raw_ref(&self) -> BorrowedFd<'_> { + self.socket.as_fd() } - fn poll(&self, flags: PollFlags) -> Result<(), PlatformError> { + pub fn poll(&self, flags: PollFlags) -> Result<(), Errno> { let timeout = PollTimeout::from(IO_TIMEOUT); let res = ignore_eintr!(poll( &mut [PollFd::new(self.socket.as_fd(), flags)], timeout )); match res { - Err(e) => Err(PlatformError::PollFailure(e)), - Ok(_res @ 0) => Err(PlatformError::PollFailure(Errno::EAGAIN)), + Err(e) => Err(e), + Ok(_res @ 0) => Err(Errno::EAGAIN), Ok(_) => Ok(()), } } @@ -142,28 +134,23 @@ impl IPCConnector { where T: Message, { - // HACK: Workaround for a macOS-specific bug - #[cfg(target_os = "macos")] - self.poll(PollFlags::POLLIN) - .map_err(IPCError::ReceptionFailure)?; - let header = self.recv_header()?; if header.kind != T::kind() { - return Err(IPCError::UnexpectedMessage(header.kind)); + return Err(IPCError::ReceptionFailure(Errno::EBADMSG)); } - let (data, _) = self.recv(header.size)?; + let (data, _) = self.recv(header.size).map_err(IPCError::ReceptionFailure)?; T::decode(&data, None).map_err(IPCError::from) } - fn send_nonblock(&self, buff: &[u8], fd: &Option<AncillaryData>) -> Result<(), PlatformError> { + fn send_nonblock(&self, buff: &[u8], fd: &Option<AncillaryData>) -> Result<(), Errno> { let iov = [IoSlice::new(buff)]; let scm_fds: Vec<i32> = fd.iter().map(|fd| fd.as_raw_fd()).collect(); let scm = ControlMessage::ScmRights(&scm_fds); let res = ignore_eintr!(sendmsg::<()>( - self.as_raw(), + self.socket.as_raw_fd(), &iov, &[scm], MsgFlags::empty(), @@ -175,20 +162,19 @@ impl IPCConnector { if bytes_sent == buff.len() { Ok(()) } else { - Err(PlatformError::SendTooShort { - expected: buff.len(), - sent: bytes_sent, - }) + // TODO: This should never happen but we might want to put a + // better error message here. + Err(Errno::EMSGSIZE) } } - Err(code) => Err(PlatformError::SendFailure(code)), + Err(code) => Err(code), } } - fn send(&self, buff: &[u8], fd: Option<AncillaryData>) -> Result<(), PlatformError> { + fn send(&self, buff: &[u8], fd: Option<AncillaryData>) -> Result<(), Errno> { let res = self.send_nonblock(buff, &fd); match res { - Err(PlatformError::SendFailure(Errno::EAGAIN)) => { + Err(_code @ Errno::EAGAIN) => { // If the socket was not ready to send data wait for it to // become unblocked then retry sending just once. self.poll(PollFlags::POLLOUT)?; @@ -198,21 +184,23 @@ impl IPCConnector { } } - pub(crate) fn recv_header(&self) -> Result<messages::Header, IPCError> { - let (header, _) = self.recv(messages::HEADER_SIZE)?; + pub fn recv_header(&self) -> Result<messages::Header, IPCError> { + let (header, _) = self + .recv(messages::HEADER_SIZE) + .map_err(IPCError::ReceptionFailure)?; messages::Header::decode(&header).map_err(IPCError::BadMessage) } fn recv_nonblock( &self, expected_size: usize, - ) -> Result<(Vec<u8>, Option<AncillaryData>), PlatformError> { + ) -> Result<(Vec<u8>, Option<AncillaryData>), Errno> { let mut buff: Vec<u8> = vec![0; expected_size]; let mut cmsg_buffer = cmsg_space!(RawFd); let mut iov = [IoSliceMut::new(&mut buff)]; let res = ignore_eintr!(recvmsg::<()>( - self.as_raw(), + self.socket.as_raw_fd(), &mut iov, Some(&mut cmsg_buffer), MsgFlags::empty(), @@ -228,12 +216,12 @@ impl IPCConnector { let res = match res { #[cfg(target_os = "macos")] Err(_code @ Errno::ENOMEM) => ignore_eintr!(recvmsg::<()>( - self.as_raw(), + self.socket.as_raw_fd(), &mut iov, Some(&mut cmsg_buffer), MsgFlags::empty(), ))?, - Err(e) => return Err(PlatformError::ReceiveFailure(e)), + Err(e) => return Err(e), Ok(val) => val, }; @@ -241,34 +229,31 @@ impl IPCConnector { if let ControlMessageOwned::ScmRights(fds) = cmsg { fds.first().map(|&fd| unsafe { OwnedFd::from_raw_fd(fd) }) } else { - return Err(PlatformError::ReceiveMissingCredentials); + return Err(Errno::EBADMSG); } } else { None }; if res.bytes != expected_size { - return Err(PlatformError::ReceiveTooShort { - expected: expected_size, - received: res.bytes, - }); + // TODO: This should only ever happen if the other side has gone rogue, + // we need a better error message here. + return Err(Errno::EBADMSG); } Ok((buff, fd)) } - pub fn recv(&self, expected_size: usize) -> Result<(Vec<u8>, Option<AncillaryData>), IPCError> { + pub fn recv(&self, expected_size: usize) -> Result<(Vec<u8>, Option<AncillaryData>), Errno> { let res = self.recv_nonblock(expected_size); match res { - Err(PlatformError::ReceiveFailure(Errno::EAGAIN)) => { + Err(_code @ Errno::EAGAIN) => { // If the socket was not ready to receive data wait for it to // become unblocked then retry receiving just once. - self.poll(PollFlags::POLLIN) - .map_err(IPCError::ReceptionFailure)?; + self.poll(PollFlags::POLLIN)?; self.recv_nonblock(expected_size) } _ => res, } - .map_err(IPCError::ReceptionFailure) } } diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_connector/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_connector/windows.rs @@ -4,28 +4,24 @@ use crate::{ errors::IPCError, - messages::{self, Message, HEADER_SIZE}, - platform::windows::{ - create_manual_reset_event, get_last_error, OverlappedOperation, PlatformError, - }, + messages::{self, Message}, + platform::windows::{create_manual_reset_event, get_last_error, OverlappedOperation}, IntoRawAncillaryData, IO_TIMEOUT, }; use std::{ ffi::{CStr, OsString}, io::Error, - os::windows::io::{ - AsHandle, AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle, - }, + os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}, ptr::null_mut, - rc::Rc, str::FromStr, time::{Duration, Instant}, }; use windows_sys::Win32::{ Foundation::{ - DuplicateHandle, DUPLICATE_CLOSE_SOURCE, DUPLICATE_SAME_ACCESS, ERROR_FILE_NOT_FOUND, - ERROR_PIPE_BUSY, FALSE, HANDLE, INVALID_HANDLE_VALUE, + DuplicateHandle, GetLastError, DUPLICATE_CLOSE_SOURCE, DUPLICATE_SAME_ACCESS, + ERROR_FILE_NOT_FOUND, ERROR_INVALID_MESSAGE, ERROR_PIPE_BUSY, FALSE, HANDLE, + INVALID_HANDLE_VALUE, WAIT_TIMEOUT, }, Security::SECURITY_ATTRIBUTES, Storage::FileSystem::{ @@ -70,13 +66,13 @@ fn extract_buffer_and_handle(buffer: Vec<u8>) -> Result<(Vec<u8>, Option<OwnedHa Ok((data.to_vec(), handle)) } -pub type IPCConnectorKey = usize; - pub struct IPCConnector { /// A connected pipe handle - handle: Rc<OwnedHandle>, + handle: OwnedHandle, /// A handle to an event which will be used for overlapped I/O on the pipe event: OwnedHandle, + /// Stores the only pending operation we might have on the pipe + overlapped: Option<OverlappedOperation>, /// The process at the other end of the pipe, this is needed to send /// ancillary data and a send operation will fail if not set. process: Option<OwnedHandle>, @@ -84,11 +80,12 @@ pub struct IPCConnector { impl IPCConnector { pub fn from_ancillary(handle: OwnedHandle) -> Result<IPCConnector, IPCError> { - let event = create_manual_reset_event().map_err(IPCError::CreationFailure)?; + let event = create_manual_reset_event()?; Ok(IPCConnector { - handle: Rc::new(handle), + handle, event, + overlapped: None, process: None, }) } @@ -109,12 +106,8 @@ impl IPCConnector { self.process = Some(process); } - pub(crate) fn as_raw(&self) -> HANDLE { - self.handle.as_raw_handle() as HANDLE - } - - pub fn key(&self) -> IPCConnectorKey { - self.handle.as_raw_handle() as IPCConnectorKey + pub fn event_raw_handle(&self) -> HANDLE { + self.event.as_raw_handle() as HANDLE } pub fn connect(server_addr: &CStr) -> Result<IPCConnector, IPCError> { @@ -150,10 +143,10 @@ impl IPCConnector { let elapsed = now.elapsed(); if elapsed >= timeout { - return Err(IPCError::Timeout); + return Err(IPCError::System(WAIT_TIMEOUT)); // TODO: We need a dedicated error } - let error = get_last_error(); + let error = unsafe { GetLastError() }; // The pipe might have not been created yet or it might be busy. if (error == ERROR_FILE_NOT_FOUND) || (error == ERROR_PIPE_BUSY) { @@ -164,14 +157,14 @@ impl IPCConnector { (timeout - elapsed).as_millis() as u32, ) }; - let error = get_last_error(); + let error = unsafe { GetLastError() }; // If the pipe hasn't been created yet loop over and try again if (res == FALSE) && (error != ERROR_FILE_NOT_FOUND) { - return Err(IPCError::ConnectionFailure(error)); + return Err(IPCError::System(error)); } } else { - return Err(IPCError::ConnectionFailure(error)); + return Err(IPCError::System(error)); } } @@ -188,7 +181,7 @@ impl IPCConnector { ) }; if res == FALSE { - return Err(IPCError::ConnectionFailure(get_last_error())); + return Err(IPCError::System(unsafe { GetLastError() })); } // SAFETY: We've verified above that the pipe handle is valid @@ -212,88 +205,111 @@ impl IPCConnector { } pub fn into_ancillary(self) -> AncillaryData { - Rc::try_unwrap(self.handle).expect("Multiple references to the underlying handle") + self.handle } pub fn into_raw_ancillary(self) -> RawAncillaryData { - self.into_ancillary().into_raw() + self.handle.into_raw() } pub fn send_message<T>(&self, message: T) -> Result<(), IPCError> where T: Message, { - self.send_message_internal(message) - .map_err(IPCError::TransmissionFailure) - } - - fn send_message_internal<T>(&self, message: T) -> Result<(), PlatformError> - where - T: Message, - { - let header = message.header(); - let (payload, ancillary_data) = message.into_payload(); - // Send the message header - OverlappedOperation::send(&self.handle, self.event.as_handle(), header)?; - - // Send the message payload plus the optional handles - let handle = if let Some(handle) = ancillary_data { - self.clone_handle(handle)? - } else { - INVALID_ANCILLARY_DATA - }; + self.send(&message.header(), None)?; - let mut buffer = Vec::<u8>::with_capacity(HANDLE_SIZE + payload.len()); - buffer.extend(handle.to_ne_bytes()); - buffer.extend(payload); + // Send the message payload + let (payload, ancillary_data) = message.into_payload(); + self.send(&payload, ancillary_data)?; - OverlappedOperation::send(&self.handle, self.event.as_handle(), buffer) + Ok(()) } pub fn recv_reply<T>(&self) -> Result<T, IPCError> where T: Message, { - let header = self - .recv_buffer(messages::HEADER_SIZE) - .map_err(IPCError::ReceptionFailure)?; - let header = messages::Header::decode(&header).map_err(IPCError::BadMessage)?; + let header = self.recv_header()?; if header.kind != T::kind() { - return Err(IPCError::UnexpectedMessage(header.kind)); + return Err(IPCError::ReceptionFailure(ERROR_INVALID_MESSAGE)); } - let (buffer, handle) = self.recv(header.size)?; - T::decode(&buffer, handle).map_err(IPCError::from) + let (data, _) = self.recv(header.size)?; + T::decode(&data, None).map_err(IPCError::from) } - pub(crate) fn sched_recv_header(&self) -> Result<OverlappedOperation, IPCError> { - OverlappedOperation::sched_recv(&self.handle, HEADER_SIZE) - .map_err(IPCError::ReceptionFailure) + fn recv_header(&self) -> Result<messages::Header, IPCError> { + let (header, _) = self.recv(messages::HEADER_SIZE)?; + messages::Header::decode(&header).map_err(IPCError::BadMessage) } - pub(crate) fn recv( - &self, - expected_size: usize, - ) -> Result<(Vec<u8>, Option<AncillaryData>), IPCError> { - let buffer = self - .recv_buffer(HANDLE_SIZE + expected_size) - .map_err(IPCError::ReceptionFailure)?; - extract_buffer_and_handle(buffer) + pub fn sched_recv_header(&mut self) -> Result<(), IPCError> { + if self.overlapped.is_some() { + // We're already waiting for a header. + return Ok(()); + } + + self.overlapped = Some(OverlappedOperation::sched_recv( + self.handle + .try_clone() + .map_err(IPCError::CloneHandleFailed)?, + self.event_raw_handle(), + HANDLE_SIZE + messages::HEADER_SIZE, + )?); + Ok(()) + } + + pub fn collect_header(&mut self) -> Result<messages::Header, IPCError> { + // We should never call collect_header() on a connector that wasn't + // waiting for one, so panic in that scenario. + let overlapped = self.overlapped.take().unwrap(); + let buffer = overlapped.collect_recv(/* wait */ false)?; + let (data, _) = extract_buffer_and_handle(buffer)?; + messages::Header::decode(data.as_ref()).map_err(IPCError::BadMessage) + } + + fn send(&self, buff: &[u8], handle: Option<AncillaryData>) -> Result<(), IPCError> { + let handle = if let Some(handle) = handle { + self.clone_handle(handle)? + } else { + INVALID_ANCILLARY_DATA + }; + + let mut buffer = Vec::<u8>::with_capacity(HANDLE_SIZE + buff.len()); + buffer.extend(handle.to_ne_bytes()); + buffer.extend(buff); + + let overlapped = OverlappedOperation::sched_send( + self.handle + .try_clone() + .map_err(IPCError::CloneHandleFailed)?, + self.event_raw_handle(), + buffer, + )?; + overlapped.complete_send(/* wait */ true) } - fn recv_buffer(&self, expected_size: usize) -> Result<Vec<u8>, PlatformError> { - OverlappedOperation::recv(&self.handle, self.event.as_handle(), expected_size) + pub fn recv(&self, expected_size: usize) -> Result<(Vec<u8>, Option<AncillaryData>), IPCError> { + let overlapped = OverlappedOperation::sched_recv( + self.handle + .try_clone() + .map_err(IPCError::CloneHandleFailed)?, + self.event_raw_handle(), + HANDLE_SIZE + expected_size, + )?; + let buffer = overlapped.collect_recv(/* wait */ true)?; + extract_buffer_and_handle(buffer) } /// Clone a handle in the destination process, this is required to /// transfer handles over this connector. Note that this consumes the /// incoming handle because we want it to be closed after it's been cloned /// over to the other process. - fn clone_handle(&self, handle: OwnedHandle) -> Result<HANDLE, PlatformError> { + fn clone_handle(&self, handle: OwnedHandle) -> Result<HANDLE, IPCError> { let Some(dst_process) = self.process.as_ref() else { - return Err(PlatformError::MissingProcessHandle); + return Err(IPCError::MissingProcessHandle); }; let mut dst_handle: HANDLE = INVALID_ANCILLARY_DATA; let res = unsafe { @@ -309,7 +325,7 @@ impl IPCConnector { }; if res == 0 { - return Err(PlatformError::CloneHandleFailed(Error::from_raw_os_error( + return Err(IPCError::CloneHandleFailed(Error::from_raw_os_error( get_last_error() as i32, ))); } diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_listener.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_listener.rs @@ -2,30 +2,6 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use thiserror::Error; - -use crate::{errors::IPCError, platform::PlatformError}; - -/***************************************************************************** - * Error definitions * - *****************************************************************************/ - -#[derive(Debug, Error)] -pub enum IPCListenerError { - #[error("Could not accept incoming connection: {0}")] - AcceptError(PlatformError), - #[error("Issue with an underlying connector: {0}")] - ConnectorError(#[from] IPCError), - #[error("Could not create listener: {0}")] - CreationError(PlatformError), - #[error("Could not listen for incoming connections: {0}")] - ListenError(PlatformError), - #[error("Could not parse handle: {0}")] - ParseError(String), - #[error("Could not create a new pipe: {0}")] - PipeCreationFailure(PlatformError), -} - /***************************************************************************** * Windows * *****************************************************************************/ diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_listener/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_listener/windows.rs @@ -4,21 +4,18 @@ use crate::{ errors::IPCError, - ipc_listener::IPCListenerError, - platform::windows::{get_last_error, server_addr, OverlappedOperation, PlatformError}, + platform::windows::{create_manual_reset_event, server_addr, OverlappedOperation}, IPCConnector, Pid, }; use std::{ - cell::RefCell, ffi::{CStr, CString, OsString}, os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, ptr::null_mut, - rc::Rc, str::FromStr, }; use windows_sys::Win32::{ - Foundation::{HANDLE, INVALID_HANDLE_VALUE, TRUE}, + Foundation::{GetLastError, HANDLE, INVALID_HANDLE_VALUE, TRUE}, Security::SECURITY_ATTRIBUTES, Storage::FileSystem::{ FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, @@ -30,86 +27,89 @@ use windows_sys::Win32::{ }; pub struct IPCListener { - /// The name of the pipe this listener will be bound to server_addr: CString, - /// A named pipe handle listening for incoming connections - handle: RefCell<Rc<OwnedHandle>>, - /// Stores the only listen operation that might be pending + handle: OwnedHandle, overlapped: Option<OverlappedOperation>, + event: OwnedHandle, } impl IPCListener { - pub(crate) fn new(server_addr: CString) -> Result<IPCListener, IPCListenerError> { - let pipe = create_named_pipe(&server_addr, /* first_instance */ true) - .map_err(IPCListenerError::PipeCreationFailure)?; + pub(crate) fn new(server_addr: CString) -> Result<IPCListener, IPCError> { + let pipe = create_named_pipe(&server_addr, /* first_instance */ true)?; + let event = create_manual_reset_event()?; Ok(IPCListener { server_addr, - handle: RefCell::new(Rc::new(pipe)), + handle: pipe, overlapped: None, + event, }) } - pub(crate) fn as_raw(&self) -> HANDLE { - self.handle.borrow().as_raw_handle() as HANDLE + pub fn event_raw_handle(&self) -> HANDLE { + self.event.as_raw_handle() as HANDLE } pub(crate) fn address(&self) -> &CStr { &self.server_addr } - pub(crate) fn sched_listen(&self) -> Result<OverlappedOperation, IPCListenerError> { - OverlappedOperation::listen(&self.handle.borrow()).map_err(IPCListenerError::ListenError) - } - - pub(crate) fn listen(&mut self) -> Result<(), IPCListenerError> { - self.overlapped = Some(self.sched_listen()?); + pub(crate) fn listen(&mut self) -> Result<(), IPCError> { + self.overlapped = Some(OverlappedOperation::listen( + self.handle + .try_clone() + .map_err(IPCError::CloneHandleFailed)?, + self.event_raw_handle(), + )?); Ok(()) } - pub fn accept(&mut self) -> Result<IPCConnector, IPCListenerError> { - let overlapped = self - .overlapped - .take() - .expect("Accepting a connection without listening first"); - overlapped.accept().map_err(IPCListenerError::AcceptError)?; - self.replace_pipe() - } + pub fn accept(&mut self) -> Result<IPCConnector, IPCError> { + // We should never call accept() on a listener that wasn't + // already waiting, so panic in that scenario. + let overlapped = self.overlapped.take().unwrap(); + overlapped.accept(self.handle.as_raw_handle() as HANDLE)?; + let new_pipe = create_named_pipe(&self.server_addr, /* first_instance */ false)?; + let connected_pipe = std::mem::replace(&mut self.handle, new_pipe); - pub(crate) fn replace_pipe(&self) -> Result<IPCConnector, IPCListenerError> { - let new_pipe = create_named_pipe(&self.server_addr, /* first_instance */ false) - .map_err(IPCListenerError::PipeCreationFailure)?; - let connected_pipe = self.handle.replace(Rc::new(new_pipe)); + // Once we've accepted a new connection and replaced the listener's + // pipe we need to listen again before we return, so that we're ready + // for the next iteration. + self.listen()?; - // We can guarantee that there's only one reference to this handle at - // this point in time. - Ok(IPCConnector::from_ancillary( - Rc::<OwnedHandle>::try_unwrap(connected_pipe).unwrap(), - )?) + IPCConnector::from_ancillary(connected_pipe) } /// Serialize this listener into a string that can be passed on the /// command-line to a child process. This only works for newly /// created listeners because they are explicitly created as inheritable. pub fn serialize(&self) -> OsString { - let raw_handle = self.handle.borrow().as_raw_handle() as usize; + let raw_handle = self.handle.as_raw_handle() as usize; OsString::from_str(raw_handle.to_string().as_ref()).unwrap() } /// Deserialize a listener from an argument passed on the command-line. /// The resulting listener is ready to accept new connections. - pub fn deserialize(string: &CStr, pid: Pid) -> Result<IPCListener, IPCListenerError> { + pub fn deserialize(string: &CStr, pid: Pid) -> Result<IPCListener, IPCError> { let server_addr = server_addr(pid); let string = string.to_str().map_err(|_e| IPCError::ParseError)?; let handle = usize::from_str(string).map_err(|_e| IPCError::ParseError)?; // SAFETY: This is a handle we passed in ourselves. let handle = unsafe { OwnedHandle::from_raw_handle(handle as RawHandle) }; + let event = create_manual_reset_event()?; - Ok(IPCListener { + let mut listener = IPCListener { server_addr, - handle: RefCell::new(Rc::new(handle)), + handle, overlapped: None, - }) + event, + }; + + // Since we've inherited this handler we need to start a new + // asynchronous operation to listen for incoming connections. + listener.listen()?; + + Ok(listener) } } @@ -118,10 +118,7 @@ impl IPCListener { // used internally and never visible externally. unsafe impl Send for IPCListener {} -fn create_named_pipe( - server_addr: &CStr, - first_instance: bool, -) -> Result<OwnedHandle, PlatformError> { +fn create_named_pipe(server_addr: &CStr, first_instance: bool) -> Result<OwnedHandle, IPCError> { const PIPE_BUFFER_SIZE: u32 = 4096; let open_mode = PIPE_ACCESS_DUPLEX @@ -154,7 +151,7 @@ fn create_named_pipe( }; if pipe == INVALID_HANDLE_VALUE { - return Err(PlatformError::CreatePipeFailure(get_last_error())); + return Err(IPCError::System(unsafe { GetLastError() })); } // SAFETY: We just verified that the handle is valid. diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_queue.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_queue.rs @@ -1,53 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ - -use thiserror::Error; - -use crate::{ - errors::{IPCError, SystemError}, - messages::MessageError, - IPCListenerError, -}; - -/***************************************************************************** - * Error definitions * - *****************************************************************************/ - -#[derive(Debug, Error)] -pub enum IPCQueueError { - #[error("Could not create queue: {0}")] - CreationFailure(SystemError), - #[error("Could not register with queue: {0}")] - RegistrationFailure(SystemError), - #[error("Could not post an event on the queue: {0}")] - PostEventFailure(SystemError), - #[error("Could not wait for events: {0}")] - WaitError(SystemError), - #[error("Underlying IPC connector error: {0}")] - IPCError(#[from] IPCError), - #[error("Underlying IPC listener error: {0}")] - IPCListenerError(#[from] IPCListenerError), - #[error("Underlying message error: {0}")] - MessageError(#[from] MessageError), -} - -/***************************************************************************** - * Windows * - *****************************************************************************/ - -#[cfg(target_os = "windows")] -pub use windows::IPCQueue; - -#[cfg(target_os = "windows")] -pub(crate) mod windows; - -/***************************************************************************** - * Android, macOS & Linux * - *****************************************************************************/ - -#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] -pub use unix::IPCQueue; - -#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] -pub(crate) mod unix; diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_queue/unix.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_queue/unix.rs @@ -1,93 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ - -use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; -use std::{collections::HashMap, os::fd::BorrowedFd, rc::Rc}; - -use crate::{ - ignore_eintr, ipc_queue::IPCQueueError, IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, -}; - -pub struct IPCQueue { - connectors: HashMap<IPCConnectorKey, Rc<IPCConnector>>, -} - -impl IPCQueue { - pub fn new(_listener: IPCListener) -> Result<IPCQueue, IPCQueueError> { - let connectors = HashMap::with_capacity(10); - Ok(IPCQueue { connectors }) - } - - pub fn add_connector(&mut self, connector: &Rc<IPCConnector>) -> Result<(), IPCQueueError> { - let res = self.connectors.insert(connector.key(), connector.clone()); - debug_assert!(res.is_none()); - Ok(()) - } - - pub fn add_listener(&self, _listener: &IPCListener) -> Result<(), IPCQueueError> { - Ok(()) - } - - pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCQueueError> { - let mut pollfds = Vec::with_capacity(self.connectors.len()); - // SAFETY: All the fds held by the queue are known to be valid. - pollfds.extend(self.connectors.iter().map(|connector| { - PollFd::new( - unsafe { BorrowedFd::borrow_raw(connector.1.as_raw()) }, - PollFlags::POLLIN, - ) - })); - - let mut events = Vec::<IPCEvent>::new(); - let mut num_events = ignore_eintr!(poll(&mut pollfds, PollTimeout::NONE)) - .map_err(IPCQueueError::WaitError)?; - - for (pollfd, (&key, connector)) in pollfds.iter().zip(&self.connectors) { - // revents() returns None only if the kernel sends back data - // that nix does not understand, we can safely assume this - // never happens in practice hence the unwrap(). - let Some(revents) = pollfd.revents() else { - // TODO: We should log this error, disconnect the socket or do - // both things. Probably needs a new event type. - continue; - }; - - if revents.contains(PollFlags::POLLHUP) { - events.push(IPCEvent::Disconnect(key)); - // If a process was disconnected then skip all further - // processing of the socket. This wouldn't matter normally, - // but on macOS calling recvmsg() on a hung-up socket seems - // to trigger a kernel panic, one we've already encountered - // in the past. Doing things this way avoids the panic - // while having no real downsides. - continue; - } - - if revents.contains(PollFlags::POLLIN) { - let header = connector.recv_header()?; - let payload = connector - .recv(header.size) - .map_err(IPCQueueError::IPCError)?; - events.push(IPCEvent::Message(key, header, payload.0, payload.1)); - } - - if !revents.is_empty() { - num_events -= 1; - - if num_events == 0 { - break; - } - } - } - - // Remove all connectors for which we've received disconnect events. - for event in &events { - if let IPCEvent::Disconnect(key) = event { - self.connectors.remove(key); - } - } - - Ok(events) - } -} diff --git a/toolkit/crashreporter/crash_helper_common/src/ipc_queue/windows.rs b/toolkit/crashreporter/crash_helper_common/src/ipc_queue/windows.rs @@ -1,281 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ - -use std::{ - collections::HashMap, - mem::MaybeUninit, - os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, - ptr::null_mut, - rc::Rc, -}; -use windows_sys::Win32::{ - Foundation::{ERROR_BROKEN_PIPE, FALSE, HANDLE, INVALID_HANDLE_VALUE}, - System::{ - Threading::INFINITE, - IO::{CreateIoCompletionPort, GetQueuedCompletionStatus, OVERLAPPED}, - }, -}; - -use crate::{ - errors::IPCError, - ipc_queue::IPCQueueError, - messages::Header, - platform::{ - windows::{get_last_error, OverlappedOperation}, - PlatformError, - }, - IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, -}; - -const CONCURRENT_THREADS: u32 = 1; - -struct IPCQueueElement { - connector: Rc<IPCConnector>, - operation: Option<OverlappedOperation>, -} - -pub struct IPCQueue { - connectors: HashMap<IPCConnectorKey, IPCQueueElement>, - listener: IPCListener, - listen_operation: Option<OverlappedOperation>, - port: OwnedHandle, -} - -impl IPCQueue { - pub fn new(listener: IPCListener) -> Result<IPCQueue, IPCQueueError> { - let listener_port = listener.as_raw(); - - // Create a new completion port that allows only one active thread. - let port = unsafe { - CreateIoCompletionPort( - /* FileHandle */ INVALID_HANDLE_VALUE, - /* ExistingCompletionPort */ 0, - /* CompletionKey */ 0, - CONCURRENT_THREADS, - ) as RawHandle - }; - - if port.is_null() { - return Err(IPCQueueError::CreationFailure(get_last_error())); - } - - let mut queue = IPCQueue { - connectors: HashMap::with_capacity(10), - listener, - listen_operation: None, - port: unsafe { OwnedHandle::from_raw_handle(port) }, - }; - - queue.add_handle(listener_port)?; - - Ok(queue) - } - - pub fn add_connector(&mut self, connector: &Rc<IPCConnector>) -> Result<(), IPCQueueError> { - self.add_handle(connector.as_raw())?; - self.insert_connector(connector); - Ok(()) - } - - fn insert_connector(&mut self, connector: &Rc<IPCConnector>) { - let res = self.connectors.insert( - connector.key(), - IPCQueueElement { - connector: connector.clone(), - operation: None, - }, - ); - debug_assert!(res.is_none()); - } - - fn add_handle(&mut self, handle: HANDLE) -> Result<(), IPCQueueError> { - let port = unsafe { - CreateIoCompletionPort( - handle, - self.port.as_raw_handle() as HANDLE, - // Use the connector's handle as the events' key - handle as usize, - CONCURRENT_THREADS, - ) as RawHandle - }; - - if port.is_null() { - return Err(IPCQueueError::RegistrationFailure(get_last_error())); - } - - Ok(()) - } - - pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCQueueError> { - let mut events = Vec::with_capacity(1); - - for element in self.connectors.values_mut() { - if element.operation.is_none() { - match element.connector.sched_recv_header() { - Ok(operation) => element.operation = Some(operation), - Err(_error @ IPCError::ReceptionFailure(PlatformError::BrokenPipe)) => { - events.push(IPCEvent::Disconnect(element.connector.key())); - } - Err(error) => return Err(IPCQueueError::from(error)), - } - } - } - - for event in &events { - if let IPCEvent::Disconnect(key) = event { - self.connectors.remove(key); - } - } - - if self.connectors.len() == 0 { - // The last client disconnected. - return Ok(events); - } - - if self.listen_operation.is_none() { - self.listen_operation = Some(self.listener.sched_listen()?); - } - - let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); - let mut completion_key = MaybeUninit::<IPCConnectorKey>::uninit(); - let mut overlapped = MaybeUninit::<*mut OVERLAPPED>::uninit(); - - let res = unsafe { - GetQueuedCompletionStatus( - self.port.as_raw_handle() as HANDLE, - number_of_bytes_transferred.as_mut_ptr(), - completion_key.as_mut_ptr(), - overlapped.as_mut_ptr(), - INFINITE, - ) - }; - - // SAFETY: `overlapped` will always be populated by - // `GetQueueCompletionStatus()` so it's safe to assume initialization. - let overlapped = unsafe { overlapped.assume_init() }; - - if res == FALSE { - let err = get_last_error(); - - // If `overlapped` is non-null then the completion packet contained - // the result of a failed I/O operation. We only handle failures - // caused by a broken pipes, all others are considered fatal. - if !overlapped.is_null() && (err == ERROR_BROKEN_PIPE) { - // SAFETY: `overlapped` was non-null, so `completion_key` has - // also been populated by `GetQueuedCompletionStatus()`. - let completion_key = unsafe { completion_key.assume_init() }; - let element = self.connectors.remove(&completion_key); - debug_assert!(element.is_some(), "Completion on missing connector"); - events.push(IPCEvent::Disconnect(completion_key)); - } else { - return Err(IPCQueueError::WaitError(err)); - } - } else { - // SAFETY: `GetQueueCompletionStatus()` successfully retrieved a - // completed I/O operation, all parameters have been populated. - let (number_of_bytes_transferred, completion_key) = unsafe { - ( - number_of_bytes_transferred.assume_init(), - completion_key.assume_init(), - ) - }; - - if number_of_bytes_transferred == 0 { - // This is an event on the listener - debug_assert!( - self.listener.as_raw() as IPCConnectorKey == completion_key, - "Completion event doesn't match the listener" - ); - let operation = self.listen_operation.take(); - if let Some(operation) = operation { - operation - .accept() - .map_err(|_e| IPCQueueError::RegistrationFailure(0))?; - } - let connector = Rc::new(self.listener.replace_pipe()?); - self.insert_connector(&connector); - - // After the pipe is connected the listener handle will have been - // replaced with a new one, so associate the new handle with the - // completion queue. - self.add_handle(self.listener.as_raw())?; - - events.push(IPCEvent::Connect(connector)); - } else { - let element = self - .connectors - .get_mut(&completion_key) - .expect("Event did not match a known connector"); - let operation = element - .operation - .take() - .expect("No pending receive operation"); - let buffer = &operation.collect_recv(); - let header = Header::decode(buffer)?; - let payload = element.connector.recv(header.size); - match payload { - Ok(payload) => { - events.push(IPCEvent::Message( - completion_key, - header, - payload.0, - payload.1, - )); - } - Err(_error @ IPCError::ReceptionFailure(PlatformError::BrokenPipe)) => { - // This connector will generate a disconnection event - // when `wait_for_events()` is called again. Do nothing - // for the time being. - } - Err(error) => return Err(IPCQueueError::from(error)), - } - } - } - - Ok(events) - } -} - -impl Drop for IPCQueue { - fn drop(&mut self) { - // Cancel all the pending operations. - for element in self.connectors.values_mut() { - if let Some(operation) = &mut element.operation { - if !operation.cancel() { - operation.leak(); - } - } - } - - if let Some(operation) = &mut self.listen_operation { - if !operation.cancel() { - operation.leak(); - } - } - - // Drain the queue, once no more events are left we can safely drop it. - loop { - let mut number_of_bytes_transferred: u32 = 0; - let mut completion_key: IPCConnectorKey = 0; - let mut overlapped: *mut OVERLAPPED = null_mut(); - - let res = unsafe { - GetQueuedCompletionStatus( - self.port.as_raw_handle() as HANDLE, - &mut number_of_bytes_transferred, - &mut completion_key, - &mut overlapped, - 0, - ) - }; - - // TODO: Check that we got enough completion events? - - if res == FALSE && overlapped.is_null() { - // TODO: Maybe check the error and report odd ones? - break; - } - } - } -} diff --git a/toolkit/crashreporter/crash_helper_common/src/lib.rs b/toolkit/crashreporter/crash_helper_common/src/lib.rs @@ -11,20 +11,17 @@ mod breakpad; mod ipc_channel; mod ipc_connector; mod ipc_listener; -mod ipc_queue; mod platform; -use messages::MessageError; +use errors::MessageError; // Re-export the platform-specific types and functions pub use crate::breakpad::{BreakpadChar, BreakpadData, BreakpadRawData, Pid}; pub use crate::ipc_channel::{IPCChannel, IPCClientChannel}; pub use crate::ipc_connector::{ - AncillaryData, IPCConnector, IPCConnectorKey, IPCEvent, RawAncillaryData, - INVALID_ANCILLARY_DATA, + AncillaryData, IPCConnector, IPCEvent, RawAncillaryData, INVALID_ANCILLARY_DATA, }; -pub use crate::ipc_listener::{IPCListener, IPCListenerError}; -pub use crate::ipc_queue::IPCQueue; +pub use crate::ipc_listener::IPCListener; pub use crate::platform::ProcessHandle; #[cfg(target_os = "windows")] diff --git a/toolkit/crashreporter/crash_helper_common/src/messages.rs b/toolkit/crashreporter/crash_helper_common/src/messages.rs @@ -7,33 +7,13 @@ use minidump_writer::minidump_writer::{AuxvType, DirectAuxvDumpInfo}; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::FromPrimitive; use std::{ - array::TryFromSliceError, - ffi::{CString, FromBytesWithNulError, NulError, OsString}, + ffi::{CString, OsString}, mem::size_of, }; -use thiserror::Error; #[cfg(target_os = "windows")] use windows_sys::Win32::System::Diagnostics::Debug::{CONTEXT, EXCEPTION_RECORD}; -use crate::{breakpad::Pid, ipc_connector::AncillaryData, BreakpadString}; - -#[derive(Debug, Error)] -pub enum MessageError { - #[error("Nul terminator found within a string")] - InteriorNul(#[from] NulError), - #[error("The message contained an invalid payload")] - InvalidData, - #[error("Message kind is invalid")] - InvalidKind, - #[error("Invalid message size")] - InvalidSize(#[from] TryFromSliceError), - #[error("Missing ancillary data")] - MissingAncillary, - #[error("Missing nul terminator")] - MissingNul(#[from] FromBytesWithNulError), - #[error("Truncated message")] - Truncated, -} +use crate::{breakpad::Pid, errors::MessageError, ipc_connector::AncillaryData, BreakpadString}; #[repr(u8)] #[derive(Copy, Clone, Debug, FromPrimitive, ToPrimitive, PartialEq)] @@ -626,7 +606,9 @@ pub struct RegisterChildProcess { impl RegisterChildProcess { pub fn new(ipc_endpoint: AncillaryData) -> RegisterChildProcess { - RegisterChildProcess { ipc_endpoint } + RegisterChildProcess { + ipc_endpoint, + } } fn payload_size(&self) -> usize { diff --git a/toolkit/crashreporter/crash_helper_common/src/platform.rs b/toolkit/crashreporter/crash_helper_common/src/platform.rs @@ -3,19 +3,19 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #[cfg(target_os = "windows")] -pub use windows::{server_addr, PlatformError, ProcessHandle}; +pub use windows::{server_addr, ProcessHandle}; #[cfg(target_os = "windows")] pub(crate) mod windows; #[cfg(any(target_os = "android", target_os = "linux"))] -pub use linux::{PlatformError, ProcessHandle}; +pub use linux::ProcessHandle; #[cfg(any(target_os = "android", target_os = "linux"))] pub(crate) mod linux; #[cfg(target_os = "macos")] -pub use macos::{PlatformError, ProcessHandle}; +pub use macos::ProcessHandle; #[cfg(target_os = "macos")] pub(crate) mod macos; diff --git a/toolkit/crashreporter/crash_helper_common/src/platform/linux.rs b/toolkit/crashreporter/crash_helper_common/src/platform/linux.rs @@ -3,61 +3,33 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use nix::{ - errno::Errno, fcntl::{ fcntl, FcntlArg::{F_GETFL, F_SETFD, F_SETFL}, FdFlag, OFlag, }, sys::socket::{socketpair, AddressFamily, SockFlag, SockType}, + Result, }; use std::os::fd::{BorrowedFd, OwnedFd}; -use thiserror::Error; pub type ProcessHandle = (); -#[derive(Error, Debug)] -pub enum PlatformError { - #[error("poll() call failed with error: {0}")] - PollFailure(Errno), - #[error("Could not set socket in non-blocking mode: {0}")] - SocketNonBlockError(Errno), - #[error("Could not flag socket as close-after-exec: {0}")] - SocketCloexecError(Errno), - #[error("Could not create a socket pair: {0}")] - SocketpairFailure(#[from] Errno), - #[error("sendmsg() call failed with error: {0}")] - SendFailure(Errno), - #[error("Sending {expected} bytes failed, only {sent} bytes sent")] - SendTooShort { expected: usize, sent: usize }, - #[error("recvmsg() call failed with error: {0}")] - ReceiveFailure(Errno), - #[error("Missing SCM credentials")] - ReceiveMissingCredentials, - #[error("Receiving {expected} bytes failed, only {received} bytes received")] - ReceiveTooShort { expected: usize, received: usize }, -} - -pub(crate) fn unix_socketpair() -> Result<(OwnedFd, OwnedFd), PlatformError> { +pub(crate) fn unix_socketpair() -> Result<(OwnedFd, OwnedFd)> { socketpair( AddressFamily::Unix, SockType::SeqPacket, None, SockFlag::empty(), ) - .map_err(PlatformError::SocketpairFailure) } -pub(crate) fn set_socket_default_flags(socket: BorrowedFd) -> Result<(), PlatformError> { +pub(crate) fn set_socket_default_flags(socket: BorrowedFd) -> Result<()> { // All our sockets are in non-blocking mode. let flags = OFlag::from_bits_retain(fcntl(socket, F_GETFL)?); - fcntl(socket, F_SETFL(flags.union(OFlag::O_NONBLOCK))) - .map(|_res| ()) - .map_err(PlatformError::SocketNonBlockError) + fcntl(socket, F_SETFL(flags.union(OFlag::O_NONBLOCK))).map(|_res| ()) } -pub(crate) fn set_socket_cloexec(socket: BorrowedFd) -> Result<(), PlatformError> { - fcntl(socket, F_SETFD(FdFlag::FD_CLOEXEC)) - .map(|_res| ()) - .map_err(PlatformError::SocketCloexecError) +pub(crate) fn set_socket_cloexec(socket: BorrowedFd) -> Result<()> { + fcntl(socket, F_SETFD(FdFlag::FD_CLOEXEC)).map(|_res| ()) } diff --git a/toolkit/crashreporter/crash_helper_common/src/platform/macos.rs b/toolkit/crashreporter/crash_helper_common/src/platform/macos.rs @@ -11,48 +11,25 @@ use nix::{ }, libc::{setsockopt, SOL_SOCKET, SO_NOSIGPIPE}, sys::socket::{socketpair, AddressFamily, SockFlag, SockType}, + Result, }; use std::{ mem::size_of, os::fd::{AsRawFd, BorrowedFd, OwnedFd}, }; -use thiserror::Error; pub type ProcessHandle = (); -#[derive(Error, Debug)] -pub enum PlatformError { - #[error("poll() call failed with error: {0}")] - PollFailure(Errno), - #[error("Could not set socket in non-blocking mode: {0}")] - SocketNonBlockError(Errno), - #[error("Could not flag socket as close-after-exec: {0}")] - SocketCloexecError(Errno), - #[error("Could not create a socket pair: {0}")] - SocketpairFailure(#[from] Errno), - #[error("sendmsg() call failed with error: {0}")] - SendFailure(Errno), - #[error("Sending {expected} bytes failed, only {sent} bytes sent")] - SendTooShort { expected: usize, sent: usize }, - #[error("recvmsg() call failed with error: {0}")] - ReceiveFailure(Errno), - #[error("Missing SCM credentials")] - ReceiveMissingCredentials, - #[error("Receiving {expected} bytes failed, only {received} bytes received")] - ReceiveTooShort { expected: usize, received: usize }, -} - -pub(crate) fn unix_socketpair() -> Result<(OwnedFd, OwnedFd), PlatformError> { +pub(crate) fn unix_socketpair() -> Result<(OwnedFd, OwnedFd)> { socketpair( AddressFamily::Unix, SockType::Stream, None, SockFlag::empty(), ) - .map_err(PlatformError::SocketpairFailure) } -pub(crate) fn set_socket_default_flags(socket: BorrowedFd) -> Result<(), PlatformError> { +pub(crate) fn set_socket_default_flags(socket: BorrowedFd) -> Result<()> { // All our sockets are in non-blocking mode. let flags = OFlag::from_bits_retain(fcntl(socket, F_GETFL)?); fcntl(socket, F_SETFL(flags.union(OFlag::O_NONBLOCK)))?; @@ -71,14 +48,12 @@ pub(crate) fn set_socket_default_flags(socket: BorrowedFd) -> Result<(), Platfor }; if res < 0 { - return Err(PlatformError::SocketNonBlockError(Errno::last())); + return Err(Errno::last()); } Ok(()) } -pub(crate) fn set_socket_cloexec(socket: BorrowedFd) -> Result<(), PlatformError> { - fcntl(socket, F_SETFD(FdFlag::FD_CLOEXEC)) - .map(|_res| ()) - .map_err(PlatformError::SocketCloexecError) +pub(crate) fn set_socket_cloexec(socket: BorrowedFd) -> Result<()> { + fcntl(socket, F_SETFD(FdFlag::FD_CLOEXEC)).map(|_res| ()) } diff --git a/toolkit/crashreporter/crash_helper_common/src/platform/windows.rs b/toolkit/crashreporter/crash_helper_common/src/platform/windows.rs @@ -2,20 +2,19 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ -use crate::{Pid, IO_TIMEOUT}; +use crate::{ + errors::{IPCError, MessageError}, + Pid, IO_TIMEOUT, +}; use std::{ ffi::CString, - mem::{zeroed, MaybeUninit}, - os::windows::io::{ - AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, OwnedHandle, RawHandle, - }, + mem::zeroed, + os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, ptr::{null, null_mut}, - rc::Rc, }; -use thiserror::Error; use windows_sys::Win32::{ Foundation::{ - GetLastError, ERROR_BROKEN_PIPE, ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_PIPE_CONNECTED, + GetLastError, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_PIPE_CONNECTED, FALSE, HANDLE, WAIT_TIMEOUT, WIN32_ERROR, }, Storage::FileSystem::{ReadFile, WriteFile}, @@ -28,36 +27,6 @@ use windows_sys::Win32::{ pub type ProcessHandle = OwnedHandle; -#[derive(Error, Debug)] -pub enum PlatformError { - #[error("Could not accept incoming connection: {0}")] - AcceptFailed(WIN32_ERROR), - #[error("Broken pipe")] - BrokenPipe, - #[error("Failed to duplicate clone handle")] - CloneHandleFailed(#[source] std::io::Error), - #[error("Could not create event: {0}")] - CreateEventFailed(WIN32_ERROR), - #[error("Could not create a pipe: {0}")] - CreatePipeFailure(WIN32_ERROR), - #[error("I/O error: {0}")] - IOError(WIN32_ERROR), - #[error("No process handle specified")] - MissingProcessHandle, - #[error("Could not listen for incoming connections: {0}")] - ListenFailed(WIN32_ERROR), - #[error("Receiving {expected} bytes failed, only {received} bytes received")] - ReceiveTooShort { expected: usize, received: usize }, - #[error("Could not reset event: {0}")] - ResetEventFailed(WIN32_ERROR), - #[error("Sending {expected} bytes failed, only {sent} bytes sent")] - SendTooShort { expected: usize, sent: usize }, - #[error("Could not set event: {0}")] - SetEventFailed(WIN32_ERROR), - #[error("Value too large")] - ValueTooLarge, -} - pub(crate) fn get_last_error() -> WIN32_ERROR { // SAFETY: This is always safe to call unsafe { GetLastError() } @@ -68,7 +37,7 @@ pub fn server_addr(pid: Pid) -> CString { CString::new(format!("\\\\.\\pipe\\gecko-crash-helper-pipe.{pid:}")).unwrap() } -pub(crate) fn create_manual_reset_event() -> Result<OwnedHandle, PlatformError> { +pub(crate) fn create_manual_reset_event() -> Result<OwnedHandle, IPCError> { // SAFETY: We pass null pointers for all the pointer arguments. let raw_handle = unsafe { CreateEventA( @@ -80,36 +49,27 @@ pub(crate) fn create_manual_reset_event() -> Result<OwnedHandle, PlatformError> } as RawHandle; if raw_handle.is_null() { - return Err(PlatformError::CreateEventFailed(get_last_error())); + return Err(IPCError::System(get_last_error())); } // SAFETY: We just verified that `raw_handle` is valid. Ok(unsafe { OwnedHandle::from_raw_handle(raw_handle) }) } -fn set_event(handle: BorrowedHandle) -> Result<(), PlatformError> { +fn set_event(handle: HANDLE) -> Result<(), IPCError> { // SAFETY: This is always safe, even when passing an invalid handle. - if unsafe { SetEvent(handle.as_raw_handle() as HANDLE) } == FALSE { - Err(PlatformError::SetEventFailed(get_last_error())) + if unsafe { SetEvent(handle) } == FALSE { + Err(IPCError::System(get_last_error())) } else { Ok(()) } } -fn reset_event(handle: BorrowedHandle) -> Result<(), PlatformError> { - // SAFETY: This is always safe, even when passing an invalid handle. - if unsafe { ResetEvent(handle.as_raw_handle() as HANDLE) } == FALSE { - Err(PlatformError::ResetEventFailed(get_last_error())) - } else { - Ok(()) - } -} - -fn cancel_overlapped_io(handle: BorrowedHandle, overlapped: &OVERLAPPED) -> bool { +fn cancel_overlapped_io(handle: HANDLE, overlapped: &mut OVERLAPPED) -> bool { // SAFETY: the pointer to the overlapped structure is always valid as the // structure is passed by reference. The handle should be valid but will // be handled properly in case it isn't. - let res = unsafe { CancelIoEx(handle.as_raw_handle() as HANDLE, overlapped) }; + let res = unsafe { CancelIoEx(handle, overlapped) }; if res == FALSE { if get_last_error() == ERROR_NOT_FOUND { // There was no pending operation @@ -119,19 +79,14 @@ fn cancel_overlapped_io(handle: BorrowedHandle, overlapped: &OVERLAPPED) -> bool return false; } - if overlapped.hEvent == 0 { - // No associated event, don't wait - return true; - } - // Just wait for the operation to finish, we don't care about the result - let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); + let mut number_of_bytes_transferred: u32 = 0; // SAFETY: Same as above let res = unsafe { GetOverlappedResultEx( - handle.as_raw_handle() as HANDLE, + handle, overlapped, - number_of_bytes_transferred.as_mut_ptr(), + &mut number_of_bytes_transferred, INFINITE, /* bAlertable */ FALSE, ) @@ -141,7 +96,7 @@ fn cancel_overlapped_io(handle: BorrowedHandle, overlapped: &OVERLAPPED) -> bool } pub(crate) struct OverlappedOperation { - handle: Rc<OwnedHandle>, + handle: OwnedHandle, overlapped: Option<Box<OVERLAPPED>>, buffer: Option<Vec<u8>>, } @@ -152,9 +107,11 @@ enum OverlappedOperationType { } impl OverlappedOperation { - // Asynchronously listen for an incoming connection - pub(crate) fn listen(handle: &Rc<OwnedHandle>) -> Result<OverlappedOperation, PlatformError> { - let mut overlapped = Self::overlapped(); + pub(crate) fn listen( + handle: OwnedHandle, + event: HANDLE, + ) -> Result<OverlappedOperation, IPCError> { + let mut overlapped = Self::overlapped_with_event(event)?; // SAFETY: We guarantee that the handle and OVERLAPPED object are both // valid and remain so while used by this function. @@ -165,42 +122,47 @@ impl OverlappedOperation { if res != FALSE { // According to Microsoft's documentation this should never happen, // we check out of an abundance of caution. - return Err(PlatformError::ListenFailed(error)); + return Err(IPCError::System(error)); } - match error { - ERROR_PIPE_CONNECTED | ERROR_IO_PENDING => { - // The operations succeeded, we'll get a completion event - } - error => return Err(PlatformError::ListenFailed(error)), - }; + if error == ERROR_PIPE_CONNECTED { + // The operation completed synchronously, set the event so that + // waiting on it will return immediately. + set_event(event)?; + } else if error != ERROR_IO_PENDING { + return Err(IPCError::System(error)); + } Ok(OverlappedOperation { - handle: handle.clone(), + handle, overlapped: Some(overlapped), buffer: None, }) } - // Synchronously accept an incoming connection, does not wait and fails if - // no incoming connection is present. - pub(crate) fn accept(mut self) -> Result<(), PlatformError> { + pub(crate) fn accept(mut self, handle: HANDLE) -> Result<(), IPCError> { let overlapped = self.overlapped.take().unwrap(); - let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); + let mut _number_of_bytes_transferred: u32 = 0; // SAFETY: The pointer to the OVERLAPPED structure is under our // control and thus guaranteed to be valid. let res = unsafe { GetOverlappedResultEx( - self.handle.as_raw_handle() as HANDLE, + handle, overlapped.as_ref(), - number_of_bytes_transferred.as_mut_ptr(), + &mut _number_of_bytes_transferred, 0, /* bAlertable */ FALSE, ) }; if res == FALSE { - return Err(PlatformError::AcceptFailed(get_last_error())); + let error = get_last_error(); + if error == ERROR_IO_INCOMPLETE { + // The I/O operation did not complete yet + self.cancel_or_leak(overlapped, None); + } + + return Err(IPCError::System(error)); } Ok(()) @@ -209,49 +171,35 @@ impl OverlappedOperation { fn await_io( mut self, optype: OverlappedOperationType, - ) -> Result<Option<Vec<u8>>, PlatformError> { + wait: bool, + ) -> Result<Option<Vec<u8>>, IPCError> { let overlapped = self.overlapped.take().unwrap(); let buffer = self.buffer.take().unwrap(); - let mut number_of_bytes_transferred = MaybeUninit::<u32>::uninit(); + let mut number_of_bytes_transferred: u32 = 0; // SAFETY: All the pointers passed to this call are under our control // and thus guaranteed to be valid. let res = unsafe { GetOverlappedResultEx( self.handle.as_raw_handle() as HANDLE, overlapped.as_ref(), - number_of_bytes_transferred.as_mut_ptr(), - IO_TIMEOUT as u32, + &mut number_of_bytes_transferred, + if wait { IO_TIMEOUT as u32 } else { 0 }, /* bAlertable */ FALSE, ) }; if res == FALSE { let error = get_last_error(); - if error == WAIT_TIMEOUT { + if (wait && (error == WAIT_TIMEOUT)) || (!wait && (error == ERROR_IO_INCOMPLETE)) { // The I/O operation did not complete yet self.cancel_or_leak(overlapped, Some(buffer)); - } else if error == ERROR_BROKEN_PIPE { - return Err(PlatformError::BrokenPipe); } - return Err(PlatformError::IOError(error)); + return Err(IPCError::System(error)); } - // SAFETY: We've verified that `number_of_bytes_transferred` has been - // populated by the `GetOverlappedResultEx()` call. - let number_of_bytes_transferred = unsafe { number_of_bytes_transferred.assume_init() }; - - if number_of_bytes_transferred as usize != buffer.len() { - return Err(match optype { - OverlappedOperationType::Read => PlatformError::ReceiveTooShort { - expected: buffer.len(), - received: number_of_bytes_transferred as usize, - }, - OverlappedOperationType::Write => PlatformError::SendTooShort { - expected: buffer.len(), - sent: number_of_bytes_transferred as usize, - }, - }); + if (number_of_bytes_transferred as usize) != buffer.len() { + return Err(IPCError::BadMessage(MessageError::InvalidData)); } Ok(match optype { @@ -260,20 +208,14 @@ impl OverlappedOperation { }) } - fn sched_recv_internal( - handle: &Rc<OwnedHandle>, - event: Option<BorrowedHandle>, + pub(crate) fn sched_recv( + handle: OwnedHandle, + event: HANDLE, expected_size: usize, - ) -> Result<OverlappedOperation, PlatformError> { - let mut overlapped = if let Some(event) = event { - OverlappedOperation::overlapped_with_event(event)? - } else { - OverlappedOperation::overlapped() - }; + ) -> Result<OverlappedOperation, IPCError> { + let mut overlapped = Self::overlapped_with_event(event)?; let mut buffer = vec![0u8; expected_size]; - let number_of_bytes_to_read: u32 = expected_size - .try_into() - .map_err(|_e| PlatformError::ValueTooLarge)?; + let number_of_bytes_to_read: u32 = expected_size.try_into()?; // SAFETY: We control all the pointers going into this call, guarantee // that they're valid and that they will be alive for the entire // duration of the asynchronous operation. @@ -289,56 +231,31 @@ impl OverlappedOperation { let error = get_last_error(); if res != FALSE { - if let Some(event) = event { - // The operation completed synchronously, if we have an event - // set it so that waiting on it will return immediately. - set_event(event)?; - } - } else if error == ERROR_BROKEN_PIPE { - return Err(PlatformError::BrokenPipe); + // The operation completed synchronously, set the event so that + // waiting on it will return immediately. + set_event(event)?; } else if error != ERROR_IO_PENDING { - return Err(PlatformError::IOError(error)); + return Err(IPCError::System(error)); } Ok(OverlappedOperation { - handle: handle.clone(), + handle, overlapped: Some(overlapped), buffer: Some(buffer), }) } - pub(crate) fn recv( - handle: &Rc<OwnedHandle>, - event: BorrowedHandle<'_>, - expected_size: usize, - ) -> Result<Vec<u8>, PlatformError> { - let overlapped = Self::sched_recv_internal(handle, Some(event), expected_size)?; - overlapped - .await_io(OverlappedOperationType::Read) - .map(|buffer| buffer.unwrap()) - } - - pub(crate) fn sched_recv( - handle: &Rc<OwnedHandle>, - expected_size: usize, - ) -> Result<OverlappedOperation, PlatformError> { - Self::sched_recv_internal(handle, None, expected_size) + pub(crate) fn collect_recv(self, wait: bool) -> Result<Vec<u8>, IPCError> { + Ok(self.await_io(OverlappedOperationType::Read, wait)?.unwrap()) } - pub(crate) fn collect_recv(mut self) -> Vec<u8> { - self.buffer.take().expect("Missing receive buffer") - } - - pub(crate) fn send( - handle: &Rc<OwnedHandle>, - event: BorrowedHandle<'_>, + pub(crate) fn sched_send( + handle: OwnedHandle, + event: HANDLE, mut buffer: Vec<u8>, - ) -> Result<(), PlatformError> { + ) -> Result<OverlappedOperation, IPCError> { let mut overlapped = Self::overlapped_with_event(event)?; - let number_of_bytes_to_write: u32 = buffer - .len() - .try_into() - .map_err(|_e| PlatformError::ValueTooLarge)?; + let number_of_bytes_to_write: u32 = buffer.len().try_into()?; // SAFETY: We control all the pointers going into this call, guarantee // that they're valid and that they will be alive for the entire // duration of the asynchronous operation. @@ -357,64 +274,36 @@ impl OverlappedOperation { // The operation completed synchronously, set the event so that // waiting on it will return immediately. set_event(event)?; - } else if error == ERROR_BROKEN_PIPE { - return Err(PlatformError::BrokenPipe); } else if error != ERROR_IO_PENDING { - return Err(PlatformError::IOError(error)); + return Err(IPCError::System(error)); } - let overlapped = OverlappedOperation { - handle: handle.clone(), + Ok(OverlappedOperation { + handle, overlapped: Some(overlapped), buffer: Some(buffer), - }; + }) + } - overlapped - .await_io(OverlappedOperationType::Write) - .map(|buffer| { - debug_assert!(buffer.is_none()); - }) + pub(crate) fn complete_send(self, wait: bool) -> Result<(), IPCError> { + self.await_io(OverlappedOperationType::Write, wait)?; + Ok(()) } - fn overlapped_with_event(event: BorrowedHandle<'_>) -> Result<Box<OVERLAPPED>, PlatformError> { - reset_event(event)?; + fn overlapped_with_event(event: HANDLE) -> Result<Box<OVERLAPPED>, IPCError> { + // SAFETY: This is always safe, even when passing an invalid handle. + if unsafe { ResetEvent(event) } == FALSE { + return Err(IPCError::System(get_last_error())); + } - // We set the last bit of the `hEvent` field to prevent this overlapped - // operation from generating completion events. The event handle will - // be notified instead when it completes. Ok(Box::new(OVERLAPPED { - hEvent: event.as_raw_handle() as HANDLE | 1, + hEvent: event, ..unsafe { zeroed() } })) } - fn overlapped() -> Box<OVERLAPPED> { - Box::new(unsafe { zeroed() }) - } - - /// Cancel the pending operation but leave the buffers intact. It's the - /// caller's responsibility to wait for the operation to complete and free - /// the buffers. - pub(crate) fn cancel(&self) -> bool { - if let Some(overlapped) = self.overlapped.as_deref() { - return cancel_overlapped_io(self.handle.as_handle(), overlapped); - } - - true - } - - /// Leak the buffers involved in the operation. - pub(crate) fn leak(&mut self) { - if let Some(overlapped) = self.overlapped.take() { - Box::leak(overlapped); - if let Some(buffer) = self.buffer.take() { - buffer.leak(); - } - } - } - fn cancel_or_leak(&self, mut overlapped: Box<OVERLAPPED>, buffer: Option<Vec<u8>>) { - if !cancel_overlapped_io(self.handle.as_handle(), overlapped.as_mut()) { + if !cancel_overlapped_io(self.handle.as_raw_handle() as HANDLE, overlapped.as_mut()) { // If we cannot cancel the operation we must leak the // associated buffers so that they're available in case it // ever completes. @@ -431,10 +320,6 @@ impl Drop for OverlappedOperation { let overlapped = self.overlapped.take(); let buffer = self.buffer.take(); if let Some(overlapped) = overlapped { - if overlapped.hEvent == 0 { - return; // This operation should have already been cancelled. - } - self.cancel_or_leak(overlapped, buffer); } } diff --git a/toolkit/crashreporter/crash_helper_server/src/ipc_server.rs b/toolkit/crashreporter/crash_helper_server/src/ipc_server.rs @@ -3,13 +3,15 @@ * You can obtain one at http://mozilla.org/MPL/2.0/. */ use anyhow::Result; -use crash_helper_common::{ - messages::Header, AncillaryData, IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, IPCQueue, -}; -use std::{collections::HashMap, rc::Rc}; +use crash_helper_common::{errors::IPCError, messages, IPCConnector, IPCEvent, IPCListener}; use crate::crash_generation::{CrashGenerator, MessageResult}; +#[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))] +mod unix; +#[cfg(target_os = "windows")] +mod windows; + #[derive(PartialEq)] pub enum IPCServerState { Running, @@ -19,73 +21,61 @@ pub enum IPCServerState { #[derive(PartialEq)] enum IPCEndpoint { Parent, // A connection to the parent process - - Child, // A connection to the child process #[allow(dead_code)] + Child, // A connection to the child process External, // A connection to an external process } struct IPCConnection { - connector: Rc<IPCConnector>, + connector: IPCConnector, endpoint: IPCEndpoint, } pub(crate) struct IPCServer { - /// Platform-specific mechanism to wait for events. This will contain - /// references to the connectors so needs to be the first element in - /// the structure so that it's dropped first. - queue: IPCQueue, - connections: HashMap<IPCConnectorKey, IPCConnection>, + #[cfg_attr(unix, allow(dead_code))] + listener: IPCListener, + connections: Vec<IPCConnection>, } impl IPCServer { - pub(crate) fn new(listener: IPCListener, connector: IPCConnector) -> Result<IPCServer> { - let connector = Rc::new(connector); - let mut queue = IPCQueue::new(listener)?; - queue.add_connector(&connector)?; - - let mut connections = HashMap::with_capacity(10); - connections.insert( - connector.key(), - IPCConnection { + pub(crate) fn new(listener: IPCListener, connector: IPCConnector) -> IPCServer { + IPCServer { + listener, + connections: vec![IPCConnection { connector, endpoint: IPCEndpoint::Parent, - }, - ); - - Ok(IPCServer { queue, connections }) + }], + } } - pub(crate) fn run(&mut self, generator: &mut CrashGenerator) -> Result<IPCServerState> { - let events = self.queue.wait_for_events()?; + pub(crate) fn run( + &mut self, + generator: &mut CrashGenerator, + ) -> Result<IPCServerState, IPCError> { + let events = self.wait_for_events()?; - for event in events.into_iter() { + // We reverse the order of events, so that we start processing them + // from the highest indexes toward the lowest. If we did the opposite + // removed connections would invalidate the successive indexes. + for event in events.into_iter().rev() { match event { IPCEvent::Connect(connector) => { - self.connections.insert( - connector.key(), - IPCConnection { - connector, - endpoint: IPCEndpoint::External, - }, - ); + self.connections.push(IPCConnection { + connector, + endpoint: IPCEndpoint::External, + }); } - IPCEvent::Message(key, header, payload, ancillary_data) => { - if let Err(error) = - self.handle_message(key, &header, payload, ancillary_data, generator) - { + IPCEvent::Header(index, header) => { + let res = self.handle_message(index, &header, generator); + if let Err(error) = res { log::error!( - "Error {error} when handling a message of kind {:?}", + "Error {error} while handling a message of {:?} kind", header.kind ); } } - IPCEvent::Disconnect(key) => { - let connection = self - .connections - .remove(&key) - .expect("Disconnection event but no corresponding connection"); - + IPCEvent::Disconnect(index) => { + let connection = self.connections.remove(index); if connection.endpoint == IPCEndpoint::Parent { // The main process disconnected, leave return Ok(IPCServerState::ClientDisconnected); @@ -99,41 +89,37 @@ impl IPCServer { fn handle_message( &mut self, - key: IPCConnectorKey, - header: &Header, - data: Vec<u8>, - ancillary_data: Option<AncillaryData>, + index: usize, + header: &messages::Header, generator: &mut CrashGenerator, ) -> Result<()> { let connection = self .connections - .get(&key) - .expect("Event received on non-existing connection"); - let connector = &connection.connector; + .get_mut(index) + .expect("Invalid connector index"); + let connector = &mut connection.connector; + let (data, ancillary_data) = connector.recv(header.size)?; - match connection.endpoint { + let connection = match connection.endpoint { IPCEndpoint::Parent => { - let res = - generator.parent_message(connector, header.kind, &data, ancillary_data)?; - if let MessageResult::Connection(connector) = res { - let connector = Rc::new(connector); - self.queue.add_connector(&connector)?; - self.connections.insert( - connector.key(), - IPCConnection { - connector, - endpoint: IPCEndpoint::Child, - }, - ); - } - } - IPCEndpoint::Child => { - generator.child_message(header.kind, &data, ancillary_data)?; + generator.parent_message(connector, header.kind, &data, ancillary_data) } + IPCEndpoint::Child => generator.child_message(header.kind, &data, ancillary_data), IPCEndpoint::External => { - generator.external_message(connector, header.kind, &data, ancillary_data)?; + generator.external_message(connector, header.kind, &data, ancillary_data) } - }; + }?; + + match connection { + MessageResult::Connection(connector) => { + self.connections.push(IPCConnection { + connector, + endpoint: IPCEndpoint::Child, + }); + } + + MessageResult::None => {} + } Ok(()) } diff --git a/toolkit/crashreporter/crash_helper_server/src/ipc_server/unix.rs b/toolkit/crashreporter/crash_helper_server/src/ipc_server/unix.rs @@ -0,0 +1,65 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crash_helper_common::{errors::IPCError, ignore_eintr, IPCEvent}; +use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; + +use super::IPCServer; + +impl IPCServer { + pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCError> { + let mut pollfds = Vec::with_capacity(self.connections.len()); + pollfds.extend( + self.connections.iter().map(|connection| { + PollFd::new(connection.connector.as_raw_ref(), PollFlags::POLLIN) + }), + ); + + let mut events = Vec::<IPCEvent>::new(); + let mut num_events = + ignore_eintr!(poll(&mut pollfds, PollTimeout::NONE)).map_err(IPCError::System)?; + + for (index, pollfd) in pollfds.iter().enumerate() { + // revents() returns None only if the kernel sends back data + // that nix does not understand, we can safely assume this + // never happens in practice hence the unwrap(). + let revents = pollfd.revents().unwrap(); + + if revents.contains(PollFlags::POLLHUP) { + events.push(IPCEvent::Disconnect(index)); + // If a process was disconnected then skip all further + // processing of the socket. This wouldn't matter normally, + // but on macOS calling recvmsg() on a hung-up socket seems + // to trigger a kernel panic, one we've already encountered + // in the past. Doing things this way avoids the panic + // while having no real downsides. + continue; + } + + if revents.contains(PollFlags::POLLIN) { + // SAFETY: The index is guaranteed to be >0 and within + // the bounds of the connections array. + let connection = unsafe { self.connections.get_unchecked(index) }; + let header = connection.connector.recv_header(); + if let Ok(header) = header { + // Note that if we encounter a failure we don't propagate + // it, when the socket gets disconnected we'll get a + // POLLHUP event anyway so deal with disconnections there + // instead of here. + events.push(IPCEvent::Header(index, header)); + } + } + + if !revents.is_empty() { + num_events -= 1; + + if num_events == 0 { + break; + } + } + } + + Ok(events) + } +} diff --git a/toolkit/crashreporter/crash_helper_server/src/ipc_server/windows.rs b/toolkit/crashreporter/crash_helper_server/src/ipc_server/windows.rs @@ -0,0 +1,96 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::convert::TryInto; + +use crash_helper_common::{errors::IPCError, IPCEvent}; +use log::error; +use windows_sys::Win32::{ + Foundation::{ERROR_BROKEN_PIPE, FALSE, HANDLE, WAIT_OBJECT_0}, + System::{ + SystemServices::MAXIMUM_WAIT_OBJECTS, + Threading::{WaitForMultipleObjects, INFINITE}, + }, +}; + +use super::IPCServer; + +impl IPCServer { + pub fn wait_for_events(&mut self) -> Result<Vec<IPCEvent>, IPCError> { + for connection in self.connections.iter_mut() { + // TODO: We might get a broken pipe error here which would cause us to + // fail instead of just dropping the disconnected connection. + connection.connector.sched_recv_header()?; + } + + let native_events = self.collect_events(); + + // SAFETY: This is less than MAXIMUM_WAIT_OBJECTS + let native_events_len: u32 = unsafe { native_events.len().try_into().unwrap_unchecked() }; + + let res = unsafe { + WaitForMultipleObjects( + native_events_len, + native_events.as_ptr(), + FALSE, // bWaitAll + INFINITE, + ) + }; + + if res >= (WAIT_OBJECT_0 + native_events_len) { + return Err(IPCError::WaitingFailure(None)); + } + + let index = (res - WAIT_OBJECT_0) as usize; + + let mut events = Vec::<IPCEvent>::new(); + if index == 0 { + if let Ok(connector) = self.listener.accept() { + events.push(IPCEvent::Connect(connector)); + } + } else { + let index = index - 1; + // SAFETY: The index is guaranteed to be within the bounds of the connections array. + let connection = unsafe { self.connections.get_unchecked_mut(index) }; + let header = connection.connector.collect_header(); + + match header { + Ok(header) => { + events.push(IPCEvent::Header(index, header)); + } + Err(error) => match error { + IPCError::System(_code @ ERROR_BROKEN_PIPE) => { + events.push(IPCEvent::Disconnect(index)); + } + _ => return Err(error), + }, + } + } + + Ok(events) + } + + /// This currently returns a vector that is no longer than + /// `MAXIMUM_WAIT_OBJECTS`, so its contents can be safely passed to + /// a `WaitForMultipleObjects()` call. + fn collect_events(&self) -> Vec<HANDLE> { + let mut events = Vec::with_capacity(1 + self.connections.len()); + + events.push(self.listener.event_raw_handle()); + for connection in self.connections.iter() { + events.push(connection.connector.event_raw_handle()); + } + + // HACK: When we hit this limit we should be splitting this list in + // multiple groups of at most MAXIMUM_WAIT_OBJECTS objects and have + // several threads wait on the groups, then wait on the threads + // themselves. + if events.len() > MAXIMUM_WAIT_OBJECTS.try_into().unwrap() { + error!("More than {MAXIMUM_WAIT_OBJECTS} processes connecting to the crash helper"); + events.truncate(MAXIMUM_WAIT_OBJECTS.try_into().unwrap()); + } + + events + } +} diff --git a/toolkit/crashreporter/crash_helper_server/src/lib.rs b/toolkit/crashreporter/crash_helper_server/src/lib.rs @@ -16,10 +16,7 @@ use crash_helper_common::Pid; #[cfg(target_os = "android")] use crash_helper_common::RawAncillaryData; use crash_helper_common::{BreakpadData, BreakpadRawData, IPCConnector, IPCListener}; -use std::{ - ffi::{c_char, CStr, OsString}, - fmt::Display, -}; +use std::ffi::{c_char, CStr, OsString}; use crash_generation::CrashGenerator; use ipc_server::{IPCServer, IPCServerState}; @@ -55,25 +52,26 @@ pub unsafe extern "C" fn crash_generator_logic_desktop( .unwrap(); let minidump_path = OsString::from(minidump_path); let listener = unsafe { CStr::from_ptr(listener) }; - let listener = unwrap_with_message( - IPCListener::deserialize(listener, client_pid), - "Could not parse the crash generator's listener", - ); + let listener = IPCListener::deserialize(listener, client_pid) + .map_err(|error| { + log::error!("Could not parse the crash generator's listener (error: {error})"); + }) + .unwrap(); let pipe = unsafe { CStr::from_ptr(pipe) }; - let connector = unwrap_with_message( - IPCConnector::deserialize(pipe), - "Could not parse the crash generator's connector", - ); + let connector = IPCConnector::deserialize(pipe) + .map_err(|error| { + log::error!("Could not parse the crash generator's connector (error: {error})"); + }) + .unwrap(); - let crash_generator = unwrap_with_message( - CrashGenerator::new(breakpad_data, minidump_path), - "Could not create the crash generator", - ); + let crash_generator = CrashGenerator::new(breakpad_data, minidump_path) + .map_err(|error| { + log::error!("Could not create the crash generator (error: {error})"); + error + }) + .unwrap(); - let ipc_server = unwrap_with_message( - IPCServer::new(listener, connector), - "Could not create the IPC server", - ); + let ipc_server = IPCServer::new(listener, connector); main_loop(ipc_server, crash_generator) } @@ -104,28 +102,26 @@ pub unsafe extern "C" fn crash_generator_logic_android( .into_string() .unwrap(); let minidump_path = OsString::from(minidump_path); + let crash_generator = CrashGenerator::new(breakpad_data, minidump_path) + .map_err(|error| { + log::error!("Could not create the crash generator (error: {error})"); + error + }) + .unwrap(); + + let listener = IPCListener::new(0).unwrap(); + // SAFETY: The `pipe` file descriptor passed in from the caller is + // guaranteed to be valid. + let connector = unsafe { IPCConnector::from_raw_ancillary(pipe) } + .map_err(|error| { + log::error!("Could not use the pipe (error: {error})"); + }) + .unwrap(); + let ipc_server = IPCServer::new(listener, connector); // On Android the main thread is used to respond to the intents so we // can't block it. Run the crash generation loop in a separate thread. - let _ = std::thread::spawn(move || { - let crash_generator = unwrap_with_message( - CrashGenerator::new(breakpad_data, minidump_path), - "Could not create the crash generator", - ); - - let listener = IPCListener::new(0).unwrap(); - // SAFETY: The `pipe` file descriptor passed in from the caller is - // guaranteed to be valid. - let connector = unwrap_with_message( - unsafe { IPCConnector::from_raw_ancillary(pipe) }, - "Could not use the pipe", - ); - let ipc_server = unwrap_with_message( - IPCServer::new(listener, connector), - "Could not create the IPC server", - ); - main_loop(ipc_server, crash_generator) - }); + let _ = std::thread::spawn(move || main_loop(ipc_server, crash_generator)); } fn main_loop(mut ipc_server: IPCServer, mut crash_generator: CrashGenerator) -> i32 { @@ -183,13 +179,3 @@ fn daemonize() { } } } - -fn unwrap_with_message<T, E: Display>(res: Result<T, E>, error_string: &str) -> T { - match res { - Ok(value) => value, - Err(error) => { - log::error!("{error_string} (error: {error})"); - panic!("{} (error: {})", error_string, error); - } - } -}