asynchronous.rs (10566B)
1 use std::{ 2 pin::Pin, 3 task::{Context, Poll}, 4 }; 5 6 use crate::{ 7 env::ErrorReporter, 8 errors::{L10nRegistryError, L10nRegistrySetupError}, 9 fluent::{FluentBundle, FluentError}, 10 registry::{BundleAdapter, L10nRegistry, MetaSources}, 11 solver::{AsyncTester, ParallelProblemSolver}, 12 source::{ResourceOption, ResourceStatus}, 13 }; 14 15 use fluent_fallback::{generator::BundleStream, types::ResourceId}; 16 use futures::{ 17 stream::{Collect, FuturesOrdered}, 18 Stream, StreamExt, 19 }; 20 use std::future::Future; 21 use unic_langid::LanguageIdentifier; 22 23 impl<P, B> L10nRegistry<P, B> 24 where 25 P: Clone, 26 B: Clone, 27 { 28 /// This method is useful for testing various configurations. 29 #[cfg(feature = "test-fluent")] 30 pub fn generate_bundles_for_lang( 31 &self, 32 langid: LanguageIdentifier, 33 resource_ids: Vec<ResourceId>, 34 ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> { 35 let lang_ids = vec![langid]; 36 37 Ok(GenerateBundles::new( 38 self.clone(), 39 lang_ids.into_iter(), 40 resource_ids, 41 // Cheaply create an immutable shallow copy of the [MetaSources]. 42 self.try_borrow_metasources()?.clone(), 43 )) 44 } 45 46 // Asynchronously generate the bundles. 47 pub fn generate_bundles( 48 &self, 49 locales: std::vec::IntoIter<LanguageIdentifier>, 50 resource_ids: Vec<ResourceId>, 51 ) -> Result<GenerateBundles<P, B>, L10nRegistrySetupError> { 52 Ok(GenerateBundles::new( 53 self.clone(), 54 locales, 55 resource_ids, 56 // Cheaply create an immutable shallow copy of the [MetaSources]. 57 self.try_borrow_metasources()?.clone(), 58 )) 59 } 60 } 61 62 /// This enum contains the various states the [GenerateBundles] can be in during the 63 /// asynchronous generation step. 64 enum State<P, B> { 65 Empty, 66 Locale(LanguageIdentifier), 67 Solver { 68 locale: LanguageIdentifier, 69 solver: ParallelProblemSolver<GenerateBundles<P, B>>, 70 }, 71 } 72 73 impl<P, B> Default for State<P, B> { 74 fn default() -> Self { 75 Self::Empty 76 } 77 } 78 79 impl<P, B> State<P, B> { 80 fn get_locale(&self) -> &LanguageIdentifier { 81 match self { 82 Self::Locale(locale) => locale, 83 Self::Solver { locale, .. } => locale, 84 Self::Empty => unreachable!("Attempting to get a locale for an empty state."), 85 } 86 } 87 88 fn take_solver(&mut self) -> ParallelProblemSolver<GenerateBundles<P, B>> { 89 replace_with::replace_with_or_default_and_return(self, |self_| match self_ { 90 Self::Solver { locale, solver } => (solver, Self::Locale(locale)), 91 _ => unreachable!("Attempting to take a solver in an invalid state."), 92 }) 93 } 94 95 fn put_back_solver(&mut self, solver: ParallelProblemSolver<GenerateBundles<P, B>>) { 96 replace_with::replace_with_or_default(self, |self_| match self_ { 97 Self::Locale(locale) => Self::Solver { locale, solver }, 98 _ => unreachable!("Attempting to put back a solver in an invalid state."), 99 }) 100 } 101 } 102 103 pub struct GenerateBundles<P, B> { 104 /// Do not access the metasources in the registry, as they may be mutated between 105 /// async iterations. 106 reg: L10nRegistry<P, B>, 107 /// This is an immutable shallow copy of the MetaSources that should not be mutated 108 /// during the iteration process. This ensures that the iterator will still be 109 /// valid if the L10nRegistry is mutated while iterating through the sources. 110 metasources: MetaSources, 111 locales: std::vec::IntoIter<LanguageIdentifier>, 112 current_metasource: usize, 113 resource_ids: Vec<ResourceId>, 114 state: State<P, B>, 115 } 116 117 impl<P, B> GenerateBundles<P, B> { 118 fn new( 119 reg: L10nRegistry<P, B>, 120 locales: std::vec::IntoIter<LanguageIdentifier>, 121 resource_ids: Vec<ResourceId>, 122 metasources: MetaSources, 123 ) -> Self { 124 Self { 125 reg, 126 metasources, 127 locales, 128 current_metasource: 0, 129 resource_ids, 130 state: State::Empty, 131 } 132 } 133 } 134 135 pub type ResourceSetStream = Collect<FuturesOrdered<ResourceStatus>, Vec<ResourceOption>>; 136 pub struct TestResult(ResourceSetStream); 137 impl std::marker::Unpin for TestResult {} 138 139 impl Future for TestResult { 140 type Output = Vec<bool>; 141 142 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 143 let pinned = Pin::new(&mut self.0); 144 pinned 145 .poll(cx) 146 .map(|set| set.iter().map(|c| !c.is_required_and_missing()).collect()) 147 } 148 } 149 150 impl<'l, P, B> AsyncTester for GenerateBundles<P, B> { 151 type Result = TestResult; 152 153 fn test_async(&self, query: Vec<(usize, usize)>) -> Self::Result { 154 let locale = self.state.get_locale(); 155 156 let stream = query 157 .iter() 158 .map(|(res_idx, source_idx)| { 159 let resource_id = &self.resource_ids[*res_idx]; 160 self.metasources 161 .filesource(self.current_metasource, *source_idx) 162 .fetch_file(locale, resource_id) 163 }) 164 .collect::<FuturesOrdered<_>>(); 165 TestResult(stream.collect::<_>()) 166 } 167 } 168 169 #[async_trait::async_trait(?Send)] 170 impl<P, B> BundleStream for GenerateBundles<P, B> { 171 async fn prefetch_async(&mut self) { 172 todo!(); 173 } 174 } 175 176 /// Generate [FluentBundles](FluentBundle) asynchronously. 177 impl<P, B> Stream for GenerateBundles<P, B> 178 where 179 P: ErrorReporter, 180 B: BundleAdapter, 181 { 182 type Item = Result<FluentBundle, (FluentBundle, Vec<FluentError>)>; 183 184 /// Asynchronously try and get a solver, and then with the solver generate a bundle. 185 /// If the solver is not ready yet, then this function will return as `Pending`, and 186 /// the Future runner will need to re-enter at a later point to try again. 187 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 188 if self.metasources.is_empty() { 189 // There are no metasources available, so no bundles can be generated. 190 return None.into(); 191 } 192 loop { 193 if let State::Solver { .. } = self.state { 194 // A solver has already been set up, continue iterating through the 195 // resources and generating a bundle. 196 197 // Pin the solver so that the async try_poll_next can be called. 198 let mut solver = self.state.take_solver(); 199 let pinned_solver = Pin::new(&mut solver); 200 201 if let std::task::Poll::Ready(solver_result) = 202 pinned_solver.try_poll_next(cx, &self, false) 203 { 204 // The solver is ready, but may not have generated an ordering. 205 206 if let Ok(Some(order)) = solver_result { 207 // The solver resolved an ordering, and a bundle may be able 208 // to be generated. 209 210 let bundle = self.metasources.bundle_from_order( 211 self.current_metasource, 212 self.state.get_locale().clone(), 213 &order, 214 &self.resource_ids, 215 &self.reg.shared.provider, 216 self.reg.shared.bundle_adapter.as_ref(), 217 ); 218 219 self.state.put_back_solver(solver); 220 221 if bundle.is_some() { 222 // The bundle was successfully generated. 223 return bundle.into(); 224 } 225 226 // No bundle was generated, continue on. 227 continue; 228 } 229 230 // There is no bundle ordering available. 231 232 if self.current_metasource > 0 { 233 // There are more metasources, create a new solver and try the 234 // next metasource. If there is an error in the solver_result 235 // ignore it for now, since there are more metasources. 236 self.current_metasource -= 1; 237 let solver = ParallelProblemSolver::new( 238 self.resource_ids.len(), 239 self.metasources.get(self.current_metasource).len(), 240 ); 241 self.state = State::Solver { 242 locale: self.state.get_locale().clone(), 243 solver, 244 }; 245 continue; 246 } 247 248 if let Err(idx) = solver_result { 249 // Since there are no more metasources, and there is an error, 250 // report it instead of ignoring it. 251 self.reg.shared.provider.report_errors(vec![ 252 L10nRegistryError::MissingResource { 253 locale: self.state.get_locale().clone(), 254 resource_id: self.resource_ids[idx].clone(), 255 }, 256 ]); 257 } 258 259 // There are no more metasources. 260 self.state = State::Empty; 261 continue; 262 } 263 264 // The solver is not ready yet, so exit out of this async task 265 // and mark it as pending. It can be tried again later. 266 self.state.put_back_solver(solver); 267 return std::task::Poll::Pending; 268 } 269 270 // There are no more metasources to search. 271 272 // Try the next locale. 273 if let Some(locale) = self.locales.next() { 274 // Restart at the end of the metasources for this locale, and iterate 275 // backwards. 276 let last_metasource_idx = self.metasources.len() - 1; 277 self.current_metasource = last_metasource_idx; 278 279 let solver = ParallelProblemSolver::new( 280 self.resource_ids.len(), 281 self.metasources.get(self.current_metasource).len(), 282 ); 283 self.state = State::Solver { locale, solver }; 284 285 // Continue iterating on the next solver. 286 continue; 287 } 288 289 // There are no more locales or metasources to search. This iterator 290 // is done. 291 return None.into(); 292 } 293 } 294 }