nack_requester.cc (10831B)
1 /* 2 * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/video_coding/nack_requester.h" 12 13 #include <algorithm> 14 #include <cstdint> 15 #include <cstdlib> 16 #include <vector> 17 18 #include "api/field_trials_view.h" 19 #include "api/sequence_checker.h" 20 #include "api/task_queue/task_queue_base.h" 21 #include "api/units/time_delta.h" 22 #include "api/units/timestamp.h" 23 #include "modules/include/module_common_types.h" 24 #include "rtc_base/checks.h" 25 #include "rtc_base/logging.h" 26 #include "rtc_base/numerics/mod_ops.h" 27 #include "rtc_base/numerics/sequence_number_util.h" 28 #include "rtc_base/task_utils/repeating_task.h" 29 #include "system_wrappers/include/clock.h" 30 31 namespace webrtc { 32 33 namespace { 34 constexpr int kMaxPacketAge = 10'000; 35 constexpr int kMaxNackPackets = 1000; 36 constexpr TimeDelta kDefaultRtt = TimeDelta::Millis(100); 37 // Number of times a packet can be nacked before giving up. Nack is sent at most 38 // every RTT. 39 constexpr int kMaxNackRetries = 100; 40 constexpr int kMaxReorderedPackets = 128; 41 constexpr int kNumReorderingBuckets = 10; 42 constexpr TimeDelta kDefaultSendNackDelay = TimeDelta::Zero(); 43 44 TimeDelta GetSendNackDelay(const FieldTrialsView& field_trials) { 45 int64_t delay_ms = strtol( 46 field_trials.Lookup("WebRTC-SendNackDelayMs").c_str(), nullptr, 10); 47 if (delay_ms > 0 && delay_ms <= 20) { 48 RTC_LOG(LS_INFO) << "SendNackDelay is set to " << delay_ms; 49 return TimeDelta::Millis(delay_ms); 50 } 51 return kDefaultSendNackDelay; 52 } 53 } // namespace 54 55 NackPeriodicProcessor::NackPeriodicProcessor(TimeDelta update_interval) 56 : update_interval_(update_interval) {} 57 58 NackPeriodicProcessor::~NackPeriodicProcessor() {} 59 60 void NackPeriodicProcessor::RegisterNackModule(NackRequesterBase* module) { 61 RTC_DCHECK_RUN_ON(&sequence_); 62 modules_.push_back(module); 63 if (modules_.size() != 1) 64 return; 65 repeating_task_ = RepeatingTaskHandle::DelayedStart( 66 TaskQueueBase::Current(), update_interval_, [this] { 67 RTC_DCHECK_RUN_ON(&sequence_); 68 ProcessNackModules(); 69 return update_interval_; 70 }); 71 } 72 73 void NackPeriodicProcessor::UnregisterNackModule(NackRequesterBase* module) { 74 RTC_DCHECK_RUN_ON(&sequence_); 75 auto it = std::find(modules_.begin(), modules_.end(), module); 76 RTC_DCHECK(it != modules_.end()); 77 modules_.erase(it); 78 if (modules_.empty()) 79 repeating_task_.Stop(); 80 } 81 82 void NackPeriodicProcessor::ProcessNackModules() { 83 RTC_DCHECK_RUN_ON(&sequence_); 84 for (NackRequesterBase* module : modules_) 85 module->ProcessNacks(); 86 } 87 88 ScopedNackPeriodicProcessorRegistration:: 89 ScopedNackPeriodicProcessorRegistration(NackRequesterBase* module, 90 NackPeriodicProcessor* processor) 91 : module_(module), processor_(processor) { 92 processor_->RegisterNackModule(module_); 93 } 94 95 ScopedNackPeriodicProcessorRegistration:: 96 ~ScopedNackPeriodicProcessorRegistration() { 97 processor_->UnregisterNackModule(module_); 98 } 99 100 NackRequester::NackInfo::NackInfo() 101 : seq_num(0), 102 send_at_seq_num(0), 103 created_at_time(Timestamp::MinusInfinity()), 104 sent_at_time(Timestamp::MinusInfinity()), 105 retries(0) {} 106 107 NackRequester::NackInfo::NackInfo(uint16_t seq_num, 108 uint16_t send_at_seq_num, 109 Timestamp created_at_time) 110 : seq_num(seq_num), 111 send_at_seq_num(send_at_seq_num), 112 created_at_time(created_at_time), 113 sent_at_time(Timestamp::MinusInfinity()), 114 retries(0) {} 115 116 NackRequester::NackRequester(TaskQueueBase* current_queue, 117 NackPeriodicProcessor* periodic_processor, 118 Clock* clock, 119 NackSender* nack_sender, 120 KeyFrameRequestSender* keyframe_request_sender, 121 const FieldTrialsView& field_trials) 122 : worker_thread_(current_queue), 123 clock_(clock), 124 nack_sender_(nack_sender), 125 keyframe_request_sender_(keyframe_request_sender), 126 reordering_histogram_(kNumReorderingBuckets, kMaxReorderedPackets), 127 initialized_(false), 128 rtt_(kDefaultRtt), 129 newest_seq_num_(0), 130 send_nack_delay_(GetSendNackDelay(field_trials)), 131 processor_registration_(this, periodic_processor) { 132 RTC_DCHECK(clock_); 133 RTC_DCHECK(nack_sender_); 134 RTC_DCHECK(keyframe_request_sender_); 135 RTC_DCHECK(worker_thread_); 136 RTC_DCHECK(worker_thread_->IsCurrent()); 137 } 138 139 NackRequester::~NackRequester() { 140 RTC_DCHECK_RUN_ON(worker_thread_); 141 } 142 143 void NackRequester::ProcessNacks() { 144 RTC_DCHECK_RUN_ON(worker_thread_); 145 std::vector<uint16_t> nack_batch = GetNackBatch(kTimeOnly); 146 if (!nack_batch.empty()) { 147 // This batch of NACKs is triggered externally; there is no external 148 // initiator who can batch them with other feedback messages. 149 nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false); 150 } 151 } 152 153 int NackRequester::OnReceivedPacket(uint16_t seq_num) { 154 RTC_DCHECK_RUN_ON(worker_thread_); 155 return OnReceivedPacket(seq_num, false); 156 } 157 158 int NackRequester::OnReceivedPacket(uint16_t seq_num, bool is_recovered) { 159 RTC_DCHECK_RUN_ON(worker_thread_); 160 // TODO(philipel): When the packet includes information whether it is 161 // retransmitted or not, use that value instead. For 162 // now set it to true, which will cause the reordering 163 // statistics to never be updated. 164 bool is_retransmitted = true; 165 166 if (!initialized_) { 167 newest_seq_num_ = seq_num; 168 initialized_ = true; 169 return 0; 170 } 171 172 // Since the `newest_seq_num_` is a packet we have actually received we know 173 // that packet has never been Nacked. 174 if (seq_num == newest_seq_num_) 175 return 0; 176 177 if (AheadOf(newest_seq_num_, seq_num)) { 178 // An out of order packet has been received. 179 auto nack_list_it = nack_list_.find(seq_num); 180 int nacks_sent_for_packet = 0; 181 if (nack_list_it != nack_list_.end()) { 182 nacks_sent_for_packet = nack_list_it->second.retries; 183 nack_list_.erase(nack_list_it); 184 } 185 if (!is_retransmitted) 186 UpdateReorderingStatistics(seq_num); 187 return nacks_sent_for_packet; 188 } 189 190 if (is_recovered) { 191 recovered_list_.insert(seq_num); 192 193 // Remove old ones so we don't accumulate recovered packets. 194 auto it = recovered_list_.lower_bound(seq_num - kMaxPacketAge); 195 if (it != recovered_list_.begin()) 196 recovered_list_.erase(recovered_list_.begin(), it); 197 198 // Do not send nack for packets recovered by FEC or RTX. 199 return 0; 200 } 201 202 AddPacketsToNack(newest_seq_num_ + 1, seq_num); 203 newest_seq_num_ = seq_num; 204 205 // Are there any nacks that are waiting for this seq_num. 206 std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly); 207 if (!nack_batch.empty()) { 208 // This batch of NACKs is triggered externally; the initiator can 209 // batch them with other feedback messages. 210 nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true); 211 } 212 213 return 0; 214 } 215 216 void NackRequester::ClearUpTo(uint16_t seq_num) { 217 // TODO(bugs.webrtc.org/11993): This method is actually called on the worker 218 // thread even though the caller stack to this call passes thread checkers 219 // indicating they belong to the network thread. The inline execution below 220 // needs to be posted to the worker thread if callers migrate to the network 221 // thread. 222 RTC_DCHECK_RUN_ON(worker_thread_); 223 nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num)); 224 recovered_list_.erase(recovered_list_.begin(), 225 recovered_list_.lower_bound(seq_num)); 226 } 227 228 void NackRequester::UpdateRtt(int64_t rtt_ms) { 229 RTC_DCHECK_RUN_ON(worker_thread_); 230 rtt_ = TimeDelta::Millis(rtt_ms); 231 } 232 233 void NackRequester::AddPacketsToNack(uint16_t seq_num_start, 234 uint16_t seq_num_end) { 235 // Called on worker_thread_. 236 // Remove old packets. 237 auto it = nack_list_.lower_bound(seq_num_end - kMaxPacketAge); 238 nack_list_.erase(nack_list_.begin(), it); 239 240 uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end); 241 if (nack_list_.size() + num_new_nacks > kMaxNackPackets) { 242 nack_list_.clear(); 243 RTC_LOG(LS_WARNING) << "NACK list full, clearing NACK" 244 " list and requesting keyframe."; 245 keyframe_request_sender_->RequestKeyFrame(); 246 return; 247 } 248 249 for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) { 250 // Do not send nack for packets that are already recovered by FEC or RTX 251 if (recovered_list_.find(seq_num) != recovered_list_.end()) 252 continue; 253 NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5), 254 clock_->CurrentTime()); 255 RTC_DCHECK(nack_list_.find(seq_num) == nack_list_.end()); 256 nack_list_[seq_num] = nack_info; 257 } 258 } 259 260 std::vector<uint16_t> NackRequester::GetNackBatch(NackFilterOptions options) { 261 // Called on worker_thread_. 262 263 bool consider_seq_num = options != kTimeOnly; 264 bool consider_timestamp = options != kSeqNumOnly; 265 Timestamp now = clock_->CurrentTime(); 266 std::vector<uint16_t> nack_batch; 267 auto it = nack_list_.begin(); 268 while (it != nack_list_.end()) { 269 bool delay_timed_out = now - it->second.created_at_time >= send_nack_delay_; 270 bool nack_on_rtt_passed = now - it->second.sent_at_time >= rtt_; 271 bool nack_on_seq_num_passed = 272 it->second.sent_at_time.IsInfinite() && 273 AheadOrAt(newest_seq_num_, it->second.send_at_seq_num); 274 if (delay_timed_out && ((consider_seq_num && nack_on_seq_num_passed) || 275 (consider_timestamp && nack_on_rtt_passed))) { 276 nack_batch.emplace_back(it->second.seq_num); 277 ++it->second.retries; 278 it->second.sent_at_time = now; 279 if (it->second.retries >= kMaxNackRetries) { 280 RTC_LOG(LS_WARNING) << "Sequence number " << it->second.seq_num 281 << " removed from NACK list due to max retries."; 282 it = nack_list_.erase(it); 283 } else { 284 ++it; 285 } 286 continue; 287 } 288 ++it; 289 } 290 return nack_batch; 291 } 292 293 void NackRequester::UpdateReorderingStatistics(uint16_t seq_num) { 294 // Running on worker_thread_. 295 RTC_DCHECK(AheadOf(newest_seq_num_, seq_num)); 296 uint16_t diff = ReverseDiff(newest_seq_num_, seq_num); 297 reordering_histogram_.Add(diff); 298 } 299 300 int NackRequester::WaitNumberOfPackets(float probability) const { 301 // Called on worker_thread_; 302 if (reordering_histogram_.NumValues() == 0) 303 return 0; 304 return reordering_histogram_.InverseCdf(probability); 305 } 306 307 } // namespace webrtc