tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

channel.rs (6314B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 use crate::{Epoch, PipelineId};
      6 use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
      7 use serde::{Deserialize, Deserializer, Serialize, Serializer};
      8 use std::io::{self, Cursor, Error, ErrorKind, Read};
      9 use std::mem;
     10 
     11 pub use crossbeam_channel as crossbeam;
     12 
     13 #[cfg(not(target_os = "windows"))]
     14 pub use crossbeam_channel::{Sender, Receiver};
     15 
     16 #[cfg(target_os = "windows")]
     17 pub use std::sync::mpsc::{Sender, Receiver};
     18 
     19 #[derive(Clone)]
     20 pub struct Payload {
     21    /// An epoch used to get the proper payload for a pipeline id frame request.
     22    ///
     23    /// TODO(emilio): Is this still relevant? We send the messages for the same
     24    /// pipeline in order, so we shouldn't need it. Seems like this was only
     25    /// wallpapering (in most cases) the underlying problem in #991.
     26    pub epoch: Epoch,
     27    /// A pipeline id to key the payload with, along with the epoch.
     28    pub pipeline_id: PipelineId,
     29    pub display_list_data: Vec<u8>,
     30 }
     31 
     32 impl Payload {
     33    /// Convert the payload to a raw byte vector, in order for it to be
     34    /// efficiently shared via shmem, for example.
     35    /// This is a helper static method working on a slice.
     36    pub fn construct_data(epoch: Epoch, pipeline_id: PipelineId, dl_data: &[u8]) -> Vec<u8> {
     37        let mut data = Vec::with_capacity(
     38            mem::size_of::<u32>() + 2 * mem::size_of::<u32>() + mem::size_of::<u64>() + dl_data.len(),
     39        );
     40        data.write_u32::<LittleEndian>(epoch.0).unwrap();
     41        data.write_u32::<LittleEndian>(pipeline_id.0).unwrap();
     42        data.write_u32::<LittleEndian>(pipeline_id.1).unwrap();
     43        data.write_u64::<LittleEndian>(dl_data.len() as u64)
     44            .unwrap();
     45        data.extend_from_slice(dl_data);
     46        data
     47    }
     48    /// Convert the payload to a raw byte vector, in order for it to be
     49    /// efficiently shared via shmem, for example.
     50    pub fn to_data(&self) -> Vec<u8> {
     51        Self::construct_data(self.epoch, self.pipeline_id, &self.display_list_data)
     52    }
     53 
     54    /// Deserializes the given payload from a raw byte vector.
     55    pub fn from_data(data: &[u8]) -> Payload {
     56        let mut payload_reader = Cursor::new(data);
     57        let epoch = Epoch(payload_reader.read_u32::<LittleEndian>().unwrap());
     58        let pipeline_id = PipelineId(
     59            payload_reader.read_u32::<LittleEndian>().unwrap(),
     60            payload_reader.read_u32::<LittleEndian>().unwrap(),
     61        );
     62 
     63        let dl_size = payload_reader.read_u64::<LittleEndian>().unwrap() as usize;
     64        let mut built_display_list_data = vec![0; dl_size];
     65        payload_reader
     66            .read_exact(&mut built_display_list_data[..])
     67            .unwrap();
     68 
     69        assert_eq!(payload_reader.position(), data.len() as u64);
     70 
     71        Payload {
     72            epoch,
     73            pipeline_id,
     74            display_list_data: built_display_list_data,
     75        }
     76    }
     77 }
     78 
     79 pub type PayloadSender = MsgSender<Payload>;
     80 
     81 pub type PayloadReceiver = MsgReceiver<Payload>;
     82 
     83 pub struct MsgReceiver<T> {
     84    rx: Receiver<T>,
     85 }
     86 
     87 impl<T> MsgReceiver<T> {
     88    pub fn recv(&self) -> Result<T, Error> {
     89        self.rx.recv().map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))
     90    }
     91 
     92    pub fn to_crossbeam_receiver(self) -> Receiver<T> {
     93        self.rx
     94    }
     95 }
     96 
     97 #[derive(Clone)]
     98 pub struct MsgSender<T> {
     99    tx: Sender<T>,
    100 }
    101 
    102 impl<T> MsgSender<T> {
    103    pub fn send(&self, data: T) -> Result<(), Error> {
    104        self.tx.send(data).map_err(|_| Error::new(ErrorKind::Other, "cannot send on closed channel"))
    105    }
    106 }
    107 
    108 pub fn payload_channel() -> Result<(PayloadSender, PayloadReceiver), Error> {
    109    let (tx, rx) = unbounded_channel();
    110    Ok((PayloadSender { tx }, PayloadReceiver { rx }))
    111 }
    112 
    113 pub fn msg_channel<T>() -> Result<(MsgSender<T>, MsgReceiver<T>), Error> {
    114    let (tx, rx) = unbounded_channel();
    115    Ok((MsgSender { tx }, MsgReceiver { rx }))
    116 }
    117 
    118 ///
    119 /// These serialize methods are needed to satisfy the compiler
    120 /// which uses these implementations for the recording tool.
    121 /// The recording tool only outputs messages that don't contain
    122 /// Senders or Receivers, so in theory these should never be
    123 /// called in the in-process config. If they are called,
    124 /// there may be a bug in the messages that the replay tool is writing.
    125 ///
    126 
    127 impl<T> Serialize for MsgSender<T> {
    128    fn serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error> {
    129        unreachable!();
    130    }
    131 }
    132 
    133 impl<'de, T> Deserialize<'de> for MsgSender<T> {
    134    fn deserialize<D>(_: D) -> Result<MsgSender<T>, D::Error>
    135                      where D: Deserializer<'de> {
    136        unreachable!();
    137    }
    138 }
    139 
    140 /// A create a channel intended for one-shot uses, for example the channels
    141 /// created to block on a synchronous query and then discarded,
    142 #[cfg(not(target_os = "windows"))]
    143 pub fn single_msg_channel<T>() -> (Sender<T>, Receiver<T>) {
    144    crossbeam_channel::bounded(1)
    145 }
    146 
    147 /// A fast MPMC message channel that can hold a fixed number of messages.
    148 ///
    149 /// If the channel is full, the sender will block upon sending extra messages
    150 /// until the receiver has consumed some messages.
    151 /// The capacity parameter should be chosen either:
    152 ///  - high enough to avoid blocking on the common cases,
    153 ///  - or, on the contrary, using the blocking behavior as a means to prevent
    154 ///    fast producers from building up work faster than it is consumed.
    155 #[cfg(not(target_os = "windows"))]
    156 pub fn fast_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
    157    crossbeam_channel::bounded(capacity)
    158 }
    159 
    160 /// Creates an MPMC channel that is a bit slower than the fast_channel but doesn't
    161 /// have a limit on the number of messages held at a given time and therefore
    162 /// doesn't block when sending.
    163 #[cfg(not(target_os = "windows"))]
    164 pub use crossbeam_channel::unbounded as unbounded_channel;
    165 
    166 
    167 #[cfg(target_os = "windows")]
    168 pub fn fast_channel<T>(_cap: usize) -> (Sender<T>, Receiver<T>) {
    169    std::sync::mpsc::channel()
    170 }
    171 
    172 #[cfg(target_os = "windows")]
    173 pub fn unbounded_channel<T>() -> (Sender<T>, Receiver<T>) {
    174    std::sync::mpsc::channel()
    175 }
    176 
    177 #[cfg(target_os = "windows")]
    178 pub fn single_msg_channel<T>() -> (Sender<T>, Receiver<T>) {
    179    std::sync::mpsc::channel()
    180 }