tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

asynchronous.rs (10566B)


      1 use std::{
      2    pin::Pin,
      3    task::{Context, Poll},
      4 };
      5 
      6 use crate::{
      7    env::ErrorReporter,
      8    errors::{L10nRegistryError, L10nRegistrySetupError},
      9    fluent::{FluentBundle, FluentError},
     10    registry::{BundleAdapter, L10nRegistry, MetaSources},
     11    solver::{AsyncTester, ParallelProblemSolver},
     12    source::{ResourceOption, ResourceStatus},
     13 };
     14 
     15 use fluent_fallback::{generator::BundleStream, types::ResourceId};
     16 use futures::{
     17    stream::{Collect, FuturesOrdered},
     18    Stream, StreamExt,
     19 };
     20 use std::future::Future;
     21 use unic_langid::LanguageIdentifier;
     22 
     23 impl<P, B> L10nRegistry<P, B>
     24 where
     25    P: Clone,
     26    B: Clone,
     27 {
     28    /// This method is useful for testing various configurations.
     29    #[cfg(feature = "test-fluent")]
     30    pub fn generate_bundles_for_lang(
     31        &self,
     32        langid: LanguageIdentifier,
     33        resource_ids: Vec<ResourceId>,
     34    ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> {
     35        let lang_ids = vec![langid];
     36 
     37        Ok(GenerateBundles::new(
     38            self.clone(),
     39            lang_ids.into_iter(),
     40            resource_ids,
     41            // Cheaply create an immutable shallow copy of the [MetaSources].
     42            self.try_borrow_metasources()?.clone(),
     43        ))
     44    }
     45 
     46    // Asynchronously generate the bundles.
     47    pub fn generate_bundles(
     48        &self,
     49        locales: std::vec::IntoIter<LanguageIdentifier>,
     50        resource_ids: Vec<ResourceId>,
     51    ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> {
     52        Ok(GenerateBundles::new(
     53            self.clone(),
     54            locales,
     55            resource_ids,
     56            // Cheaply create an immutable shallow copy of the [MetaSources].
     57            self.try_borrow_metasources()?.clone(),
     58        ))
     59    }
     60 }
     61 
     62 /// This enum contains the various states the [GenerateBundles] can be in during the
     63 /// asynchronous generation step.
     64 enum State<P, B> {
     65    Empty,
     66    Locale(LanguageIdentifier),
     67    Solver {
     68        locale: LanguageIdentifier,
     69        solver: ParallelProblemSolver<GenerateBundles<P, B>>,
     70    },
     71 }
     72 
     73 impl<P, B> Default for State<P, B> {
     74    fn default() -> Self {
     75        Self::Empty
     76    }
     77 }
     78 
     79 impl<P, B> State<P, B> {
     80    fn get_locale(&self) -> &LanguageIdentifier {
     81        match self {
     82            Self::Locale(locale) => locale,
     83            Self::Solver { locale, .. } => locale,
     84            Self::Empty => unreachable!("Attempting to get a locale for an empty state."),
     85        }
     86    }
     87 
     88    fn take_solver(&mut self) -> ParallelProblemSolver<GenerateBundles<P, B>> {
     89        replace_with::replace_with_or_default_and_return(self, |self_| match self_ {
     90            Self::Solver { locale, solver } => (solver, Self::Locale(locale)),
     91            _ => unreachable!("Attempting to take a solver in an invalid state."),
     92        })
     93    }
     94 
     95    fn put_back_solver(&mut self, solver: ParallelProblemSolver<GenerateBundles<P, B>>) {
     96        replace_with::replace_with_or_default(self, |self_| match self_ {
     97            Self::Locale(locale) => Self::Solver { locale, solver },
     98            _ => unreachable!("Attempting to put back a solver in an invalid state."),
     99        })
    100    }
    101 }
    102 
    103 pub struct GenerateBundles<P, B> {
    104    /// Do not access the metasources in the registry, as they may be mutated between
    105    /// async iterations.
    106    reg: L10nRegistry<P, B>,
    107    /// This is an immutable shallow copy of the MetaSources that should not be mutated
    108    /// during the iteration process. This ensures that the iterator will still be
    109    /// valid if the L10nRegistry is mutated while iterating through the sources.
    110    metasources: MetaSources,
    111    locales: std::vec::IntoIter<LanguageIdentifier>,
    112    current_metasource: usize,
    113    resource_ids: Vec<ResourceId>,
    114    state: State<P, B>,
    115 }
    116 
    117 impl<P, B> GenerateBundles<P, B> {
    118    fn new(
    119        reg: L10nRegistry<P, B>,
    120        locales: std::vec::IntoIter<LanguageIdentifier>,
    121        resource_ids: Vec<ResourceId>,
    122        metasources: MetaSources,
    123    ) -> Self {
    124        Self {
    125            reg,
    126            metasources,
    127            locales,
    128            current_metasource: 0,
    129            resource_ids,
    130            state: State::Empty,
    131        }
    132    }
    133 }
    134 
    135 pub type ResourceSetStream = Collect<FuturesOrdered<ResourceStatus>, Vec<ResourceOption>>;
    136 pub struct TestResult(ResourceSetStream);
    137 impl std::marker::Unpin for TestResult {}
    138 
    139 impl Future for TestResult {
    140    type Output = Vec<bool>;
    141 
    142    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    143        let pinned = Pin::new(&mut self.0);
    144        pinned
    145            .poll(cx)
    146            .map(|set| set.iter().map(|c| !c.is_required_and_missing()).collect())
    147    }
    148 }
    149 
    150 impl<'l, P, B> AsyncTester for GenerateBundles<P, B> {
    151    type Result = TestResult;
    152 
    153    fn test_async(&self, query: Vec<(usize, usize)>) -> Self::Result {
    154        let locale = self.state.get_locale();
    155 
    156        let stream = query
    157            .iter()
    158            .map(|(res_idx, source_idx)| {
    159                let resource_id = &self.resource_ids[*res_idx];
    160                self.metasources
    161                    .filesource(self.current_metasource, *source_idx)
    162                    .fetch_file(locale, resource_id)
    163            })
    164            .collect::<FuturesOrdered<_>>();
    165        TestResult(stream.collect::<_>())
    166    }
    167 }
    168 
    169 #[async_trait::async_trait(?Send)]
    170 impl<P, B> BundleStream for GenerateBundles<P, B> {
    171    async fn prefetch_async(&mut self) {
    172        todo!();
    173    }
    174 }
    175 
    176 /// Generate [FluentBundles](FluentBundle) asynchronously.
    177 impl<P, B> Stream for GenerateBundles<P, B>
    178 where
    179    P: ErrorReporter,
    180    B: BundleAdapter,
    181 {
    182    type Item = Result<FluentBundle, (FluentBundle, Vec<FluentError>)>;
    183 
    184    /// Asynchronously try and get a solver, and then with the solver generate a bundle.
    185    /// If the solver is not ready yet, then this function will return as `Pending`, and
    186    /// the Future runner will need to re-enter at a later point to try again.
    187    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    188        if self.metasources.is_empty() {
    189            // There are no metasources available, so no bundles can be generated.
    190            return None.into();
    191        }
    192        loop {
    193            if let State::Solver { .. } = self.state {
    194                // A solver has already been set up, continue iterating through the
    195                // resources and generating a bundle.
    196 
    197                // Pin the solver so that the async try_poll_next can be called.
    198                let mut solver = self.state.take_solver();
    199                let pinned_solver = Pin::new(&mut solver);
    200 
    201                if let std::task::Poll::Ready(solver_result) =
    202                    pinned_solver.try_poll_next(cx, &self, false)
    203                {
    204                    // The solver is ready, but may not have generated an ordering.
    205 
    206                    if let Ok(Some(order)) = solver_result {
    207                        // The solver resolved an ordering, and a bundle may be able
    208                        // to be generated.
    209 
    210                        let bundle = self.metasources.bundle_from_order(
    211                            self.current_metasource,
    212                            self.state.get_locale().clone(),
    213                            &order,
    214                            &self.resource_ids,
    215                            &self.reg.shared.provider,
    216                            self.reg.shared.bundle_adapter.as_ref(),
    217                        );
    218 
    219                        self.state.put_back_solver(solver);
    220 
    221                        if bundle.is_some() {
    222                            // The bundle was successfully generated.
    223                            return bundle.into();
    224                        }
    225 
    226                        // No bundle was generated, continue on.
    227                        continue;
    228                    }
    229 
    230                    // There is no bundle ordering available.
    231 
    232                    if self.current_metasource > 0 {
    233                        // There are more metasources, create a new solver and try the
    234                        // next metasource. If there is an error in the solver_result
    235                        // ignore it for now, since there are more metasources.
    236                        self.current_metasource -= 1;
    237                        let solver = ParallelProblemSolver::new(
    238                            self.resource_ids.len(),
    239                            self.metasources.get(self.current_metasource).len(),
    240                        );
    241                        self.state = State::Solver {
    242                            locale: self.state.get_locale().clone(),
    243                            solver,
    244                        };
    245                        continue;
    246                    }
    247 
    248                    if let Err(idx) = solver_result {
    249                        // Since there are no more metasources, and there is an error,
    250                        // report it instead of ignoring it.
    251                        self.reg.shared.provider.report_errors(vec![
    252                            L10nRegistryError::MissingResource {
    253                                locale: self.state.get_locale().clone(),
    254                                resource_id: self.resource_ids[idx].clone(),
    255                            },
    256                        ]);
    257                    }
    258 
    259                    // There are no more metasources.
    260                    self.state = State::Empty;
    261                    continue;
    262                }
    263 
    264                // The solver is not ready yet, so exit out of this async task
    265                // and mark it as pending. It can be tried again later.
    266                self.state.put_back_solver(solver);
    267                return std::task::Poll::Pending;
    268            }
    269 
    270            // There are no more metasources to search.
    271 
    272            // Try the next locale.
    273            if let Some(locale) = self.locales.next() {
    274                // Restart at the end of the metasources for this locale, and iterate
    275                // backwards.
    276                let last_metasource_idx = self.metasources.len() - 1;
    277                self.current_metasource = last_metasource_idx;
    278 
    279                let solver = ParallelProblemSolver::new(
    280                    self.resource_ids.len(),
    281                    self.metasources.get(self.current_metasource).len(),
    282                );
    283                self.state = State::Solver { locale, solver };
    284 
    285                // Continue iterating on the next solver.
    286                continue;
    287            }
    288 
    289            // There are no more locales or metasources to search. This iterator
    290            // is done.
    291            return None.into();
    292        }
    293    }
    294 }