resource_adaptation_processor.cc (15855B)
1 /* 2 * Copyright 2020 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "call/adaptation/resource_adaptation_processor.h" 12 13 #include <algorithm> 14 #include <map> 15 #include <string> 16 #include <tuple> 17 #include <utility> 18 #include <vector> 19 20 #include "absl/algorithm/container.h" 21 #include "absl/strings/string_view.h" 22 #include "api/adaptation/resource.h" 23 #include "api/make_ref_counted.h" 24 #include "api/scoped_refptr.h" 25 #include "api/sequence_checker.h" 26 #include "api/task_queue/task_queue_base.h" 27 #include "api/video/video_adaptation_counters.h" 28 #include "call/adaptation/resource_adaptation_processor_interface.h" 29 #include "call/adaptation/video_source_restrictions.h" 30 #include "call/adaptation/video_stream_adapter.h" 31 #include "rtc_base/checks.h" 32 #include "rtc_base/logging.h" 33 #include "rtc_base/strings/string_builder.h" 34 #include "rtc_base/synchronization/mutex.h" 35 36 namespace webrtc { 37 38 ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate( 39 ResourceAdaptationProcessor* processor) 40 : task_queue_(TaskQueueBase::Current()), processor_(processor) { 41 RTC_DCHECK(task_queue_); 42 } 43 44 void ResourceAdaptationProcessor::ResourceListenerDelegate:: 45 OnProcessorDestroyed() { 46 RTC_DCHECK_RUN_ON(task_queue_); 47 processor_ = nullptr; 48 } 49 50 void ResourceAdaptationProcessor::ResourceListenerDelegate:: 51 OnResourceUsageStateMeasured(scoped_refptr<Resource> resource, 52 ResourceUsageState usage_state) { 53 if (!task_queue_->IsCurrent()) { 54 task_queue_->PostTask( 55 [this_ref = scoped_refptr<ResourceListenerDelegate>(this), resource, 56 usage_state] { 57 this_ref->OnResourceUsageStateMeasured(resource, usage_state); 58 }); 59 return; 60 } 61 RTC_DCHECK_RUN_ON(task_queue_); 62 if (processor_) { 63 processor_->OnResourceUsageStateMeasured(resource, usage_state); 64 } 65 } 66 67 ResourceAdaptationProcessor::MitigationResultAndLogMessage:: 68 MitigationResultAndLogMessage() 69 : result(MitigationResult::kAdaptationApplied), message() {} 70 71 ResourceAdaptationProcessor::MitigationResultAndLogMessage:: 72 MitigationResultAndLogMessage(MitigationResult result, 73 absl::string_view message) 74 : result(result), message(message) {} 75 76 ResourceAdaptationProcessor::ResourceAdaptationProcessor( 77 VideoStreamAdapter* stream_adapter) 78 : task_queue_(TaskQueueBase::Current()), 79 resource_listener_delegate_( 80 make_ref_counted<ResourceListenerDelegate>(this)), 81 resources_(), 82 stream_adapter_(stream_adapter), 83 last_reported_source_restrictions_(), 84 previous_mitigation_results_() { 85 RTC_DCHECK(task_queue_); 86 stream_adapter_->AddRestrictionsListener(this); 87 } 88 89 ResourceAdaptationProcessor::~ResourceAdaptationProcessor() { 90 RTC_DCHECK_RUN_ON(task_queue_); 91 RTC_DCHECK(resources_.empty()) 92 << "There are resource(s) attached to a ResourceAdaptationProcessor " 93 << "being destroyed."; 94 stream_adapter_->RemoveRestrictionsListener(this); 95 resource_listener_delegate_->OnProcessorDestroyed(); 96 } 97 98 void ResourceAdaptationProcessor::AddResourceLimitationsListener( 99 ResourceLimitationsListener* limitations_listener) { 100 RTC_DCHECK_RUN_ON(task_queue_); 101 RTC_DCHECK(std::find(resource_limitations_listeners_.begin(), 102 resource_limitations_listeners_.end(), 103 limitations_listener) == 104 resource_limitations_listeners_.end()); 105 resource_limitations_listeners_.push_back(limitations_listener); 106 } 107 108 void ResourceAdaptationProcessor::RemoveResourceLimitationsListener( 109 ResourceLimitationsListener* limitations_listener) { 110 RTC_DCHECK_RUN_ON(task_queue_); 111 auto it = 112 std::find(resource_limitations_listeners_.begin(), 113 resource_limitations_listeners_.end(), limitations_listener); 114 RTC_DCHECK(it != resource_limitations_listeners_.end()); 115 resource_limitations_listeners_.erase(it); 116 } 117 118 void ResourceAdaptationProcessor::AddResource( 119 scoped_refptr<Resource> resource) { 120 RTC_DCHECK(resource); 121 { 122 MutexLock crit(&resources_lock_); 123 RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end()) 124 << "Resource \"" << resource->Name() << "\" was already registered."; 125 resources_.push_back(resource); 126 } 127 resource->SetResourceListener(resource_listener_delegate_.get()); 128 RTC_LOG(LS_INFO) << "Registered resource \"" << resource->Name() << "\"."; 129 } 130 131 std::vector<scoped_refptr<Resource>> ResourceAdaptationProcessor::GetResources() 132 const { 133 MutexLock crit(&resources_lock_); 134 return resources_; 135 } 136 137 void ResourceAdaptationProcessor::RemoveResource( 138 scoped_refptr<Resource> resource) { 139 RTC_DCHECK(resource); 140 RTC_LOG(LS_INFO) << "Removing resource \"" << resource->Name() << "\"."; 141 resource->SetResourceListener(nullptr); 142 { 143 MutexLock crit(&resources_lock_); 144 auto it = absl::c_find(resources_, resource); 145 RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name() 146 << "\" was not a registered resource."; 147 resources_.erase(it); 148 } 149 RemoveLimitationsImposedByResource(std::move(resource)); 150 } 151 152 void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource( 153 scoped_refptr<Resource> resource) { 154 if (!task_queue_->IsCurrent()) { 155 task_queue_->PostTask( 156 [this, resource]() { RemoveLimitationsImposedByResource(resource); }); 157 return; 158 } 159 RTC_DCHECK_RUN_ON(task_queue_); 160 auto resource_adaptation_limits = 161 adaptation_limits_by_resources_.find(resource); 162 if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) { 163 VideoStreamAdapter::RestrictionsWithCounters adaptation_limits = 164 resource_adaptation_limits->second; 165 adaptation_limits_by_resources_.erase(resource_adaptation_limits); 166 if (adaptation_limits_by_resources_.empty()) { 167 // Only the resource being removed was adapted so clear restrictions. 168 stream_adapter_->ClearRestrictions(); 169 return; 170 } 171 172 VideoStreamAdapter::RestrictionsWithCounters most_limited = 173 FindMostLimitedResources().second; 174 175 if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) { 176 // The removed limitations were less limited than the most limited 177 // resource. Don't change the current restrictions. 178 return; 179 } 180 181 // Apply the new most limited resource as the next restrictions. 182 Adaptation adapt_to = stream_adapter_->GetAdaptationTo( 183 most_limited.counters, most_limited.restrictions); 184 RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid); 185 stream_adapter_->ApplyAdaptation(adapt_to, nullptr); 186 187 RTC_LOG(LS_INFO) 188 << "Most limited resource removed. Restoring restrictions to " 189 "next most limited restrictions: " 190 << most_limited.restrictions.ToString() << " with counters " 191 << most_limited.counters.ToString(); 192 } 193 } 194 195 void ResourceAdaptationProcessor::OnResourceUsageStateMeasured( 196 scoped_refptr<Resource> resource, 197 ResourceUsageState usage_state) { 198 RTC_DCHECK_RUN_ON(task_queue_); 199 RTC_DCHECK(resource); 200 // `resource` could have been removed after signalling. 201 { 202 MutexLock crit(&resources_lock_); 203 if (absl::c_find(resources_, resource) == resources_.end()) { 204 RTC_LOG(LS_INFO) << "Ignoring signal from removed resource \"" 205 << resource->Name() << "\"."; 206 return; 207 } 208 } 209 MitigationResultAndLogMessage result_and_message; 210 switch (usage_state) { 211 case ResourceUsageState::kOveruse: 212 result_and_message = OnResourceOveruse(resource); 213 break; 214 case ResourceUsageState::kUnderuse: 215 result_and_message = OnResourceUnderuse(resource); 216 break; 217 } 218 // Maybe log the result of the operation. 219 auto it = previous_mitigation_results_.find(resource.get()); 220 if (it != previous_mitigation_results_.end() && 221 it->second == result_and_message.result) { 222 // This resource has previously reported the same result and we haven't 223 // successfully adapted since - don't log to avoid spam. 224 return; 225 } 226 RTC_LOG(LS_INFO) << "Resource \"" << resource->Name() << "\" signalled " 227 << ResourceUsageStateToString(usage_state) << ". " 228 << result_and_message.message; 229 if (result_and_message.result == MitigationResult::kAdaptationApplied) { 230 previous_mitigation_results_.clear(); 231 } else { 232 previous_mitigation_results_.insert( 233 std::make_pair(resource.get(), result_and_message.result)); 234 } 235 } 236 237 ResourceAdaptationProcessor::MitigationResultAndLogMessage 238 ResourceAdaptationProcessor::OnResourceUnderuse( 239 scoped_refptr<Resource> reason_resource) { 240 RTC_DCHECK_RUN_ON(task_queue_); 241 // How can this stream be adapted up? 242 Adaptation adaptation = stream_adapter_->GetAdaptationUp(); 243 if (adaptation.status() != Adaptation::Status::kValid) { 244 StringBuilder message; 245 message << "Not adapting up because VideoStreamAdapter returned " 246 << Adaptation::StatusToString(adaptation.status()); 247 return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter, 248 message.Release()); 249 } 250 // Check that resource is most limited. 251 std::vector<scoped_refptr<Resource>> most_limited_resources; 252 VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions; 253 std::tie(most_limited_resources, most_limited_restrictions) = 254 FindMostLimitedResources(); 255 256 // If the most restricted resource is less limited than current restrictions 257 // then proceed with adapting up. 258 if (!most_limited_resources.empty() && 259 most_limited_restrictions.counters.Total() >= 260 stream_adapter_->adaptation_counters().Total()) { 261 // If `reason_resource` is not one of the most limiting resources then abort 262 // adaptation. 263 if (absl::c_find(most_limited_resources, reason_resource) == 264 most_limited_resources.end()) { 265 StringBuilder message; 266 message << "Resource \"" << reason_resource->Name() 267 << "\" was not the most limited resource."; 268 return MitigationResultAndLogMessage( 269 MitigationResult::kNotMostLimitedResource, message.Release()); 270 } 271 272 if (most_limited_resources.size() > 1) { 273 // If there are multiple most limited resources, all must signal underuse 274 // before the adaptation is applied. 275 UpdateResourceLimitations(reason_resource, adaptation.restrictions(), 276 adaptation.counters()); 277 StringBuilder message; 278 message << "Resource \"" << reason_resource->Name() 279 << "\" was not the only most limited resource."; 280 return MitigationResultAndLogMessage( 281 MitigationResult::kSharedMostLimitedResource, message.Release()); 282 } 283 } 284 // Apply adaptation. 285 stream_adapter_->ApplyAdaptation(adaptation, reason_resource); 286 StringBuilder message; 287 message << "Adapted up successfully. Unfiltered adaptations: " 288 << stream_adapter_->adaptation_counters().ToString(); 289 return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied, 290 message.Release()); 291 } 292 293 ResourceAdaptationProcessor::MitigationResultAndLogMessage 294 ResourceAdaptationProcessor::OnResourceOveruse( 295 scoped_refptr<Resource> reason_resource) { 296 RTC_DCHECK_RUN_ON(task_queue_); 297 // How can this stream be adapted up? 298 Adaptation adaptation = stream_adapter_->GetAdaptationDown(); 299 if (adaptation.status() == Adaptation::Status::kLimitReached) { 300 // Add resource as most limited. 301 VideoStreamAdapter::RestrictionsWithCounters restrictions; 302 std::tie(std::ignore, restrictions) = FindMostLimitedResources(); 303 UpdateResourceLimitations(reason_resource, restrictions.restrictions, 304 restrictions.counters); 305 } 306 if (adaptation.status() != Adaptation::Status::kValid) { 307 StringBuilder message; 308 message << "Not adapting down because VideoStreamAdapter returned " 309 << Adaptation::StatusToString(adaptation.status()); 310 return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter, 311 message.Release()); 312 } 313 // Apply adaptation. 314 UpdateResourceLimitations(reason_resource, adaptation.restrictions(), 315 adaptation.counters()); 316 stream_adapter_->ApplyAdaptation(adaptation, reason_resource); 317 StringBuilder message; 318 message << "Adapted down successfully. Unfiltered adaptations: " 319 << stream_adapter_->adaptation_counters().ToString(); 320 return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied, 321 message.Release()); 322 } 323 324 std::pair<std::vector<scoped_refptr<Resource>>, 325 VideoStreamAdapter::RestrictionsWithCounters> 326 ResourceAdaptationProcessor::FindMostLimitedResources() const { 327 std::vector<scoped_refptr<Resource>> most_limited_resources; 328 VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions{ 329 .restrictions = VideoSourceRestrictions(), 330 .counters = VideoAdaptationCounters()}; 331 332 for (const auto& resource_and_adaptation_limit_ : 333 adaptation_limits_by_resources_) { 334 const auto& restrictions_with_counters = 335 resource_and_adaptation_limit_.second; 336 if (restrictions_with_counters.counters.Total() > 337 most_limited_restrictions.counters.Total()) { 338 most_limited_restrictions = restrictions_with_counters; 339 most_limited_resources.clear(); 340 most_limited_resources.push_back(resource_and_adaptation_limit_.first); 341 } else if (most_limited_restrictions.counters == 342 restrictions_with_counters.counters) { 343 most_limited_resources.push_back(resource_and_adaptation_limit_.first); 344 } 345 } 346 return std::make_pair(std::move(most_limited_resources), 347 most_limited_restrictions); 348 } 349 350 void ResourceAdaptationProcessor::UpdateResourceLimitations( 351 scoped_refptr<Resource> reason_resource, 352 const VideoSourceRestrictions& restrictions, 353 const VideoAdaptationCounters& counters) { 354 auto& adaptation_limits = adaptation_limits_by_resources_[reason_resource]; 355 if (adaptation_limits.restrictions == restrictions && 356 adaptation_limits.counters == counters) { 357 return; 358 } 359 adaptation_limits = {.restrictions = restrictions, .counters = counters}; 360 361 std::map<scoped_refptr<Resource>, VideoAdaptationCounters> limitations; 362 for (const auto& p : adaptation_limits_by_resources_) { 363 limitations.insert(std::make_pair(p.first, p.second.counters)); 364 } 365 for (auto limitations_listener : resource_limitations_listeners_) { 366 limitations_listener->OnResourceLimitationChanged(reason_resource, 367 limitations); 368 } 369 } 370 371 void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated( 372 VideoSourceRestrictions /* restrictions */, 373 const VideoAdaptationCounters& adaptation_counters, 374 scoped_refptr<Resource> reason, 375 const VideoSourceRestrictions& unfiltered_restrictions) { 376 RTC_DCHECK_RUN_ON(task_queue_); 377 if (reason) { 378 UpdateResourceLimitations(reason, unfiltered_restrictions, 379 adaptation_counters); 380 } else if (adaptation_counters.Total() == 0) { 381 // Adaptations are cleared. 382 adaptation_limits_by_resources_.clear(); 383 previous_mitigation_results_.clear(); 384 for (auto limitations_listener : resource_limitations_listeners_) { 385 limitations_listener->OnResourceLimitationChanged(nullptr, {}); 386 } 387 } 388 } 389 390 } // namespace webrtc