tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

nack_requester.cc (10831B)


      1 /*
      2 *  Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
      3 *
      4 *  Use of this source code is governed by a BSD-style license
      5 *  that can be found in the LICENSE file in the root of the source
      6 *  tree. An additional intellectual property rights grant can be found
      7 *  in the file PATENTS.  All contributing project authors may
      8 *  be found in the AUTHORS file in the root of the source tree.
      9 */
     10 
     11 #include "modules/video_coding/nack_requester.h"
     12 
     13 #include <algorithm>
     14 #include <cstdint>
     15 #include <cstdlib>
     16 #include <vector>
     17 
     18 #include "api/field_trials_view.h"
     19 #include "api/sequence_checker.h"
     20 #include "api/task_queue/task_queue_base.h"
     21 #include "api/units/time_delta.h"
     22 #include "api/units/timestamp.h"
     23 #include "modules/include/module_common_types.h"
     24 #include "rtc_base/checks.h"
     25 #include "rtc_base/logging.h"
     26 #include "rtc_base/numerics/mod_ops.h"
     27 #include "rtc_base/numerics/sequence_number_util.h"
     28 #include "rtc_base/task_utils/repeating_task.h"
     29 #include "system_wrappers/include/clock.h"
     30 
     31 namespace webrtc {
     32 
     33 namespace {
     34 constexpr int kMaxPacketAge = 10'000;
     35 constexpr int kMaxNackPackets = 1000;
     36 constexpr TimeDelta kDefaultRtt = TimeDelta::Millis(100);
     37 // Number of times a packet can be nacked before giving up. Nack is sent at most
     38 // every RTT.
     39 constexpr int kMaxNackRetries = 100;
     40 constexpr int kMaxReorderedPackets = 128;
     41 constexpr int kNumReorderingBuckets = 10;
     42 constexpr TimeDelta kDefaultSendNackDelay = TimeDelta::Zero();
     43 
     44 TimeDelta GetSendNackDelay(const FieldTrialsView& field_trials) {
     45  int64_t delay_ms = strtol(
     46      field_trials.Lookup("WebRTC-SendNackDelayMs").c_str(), nullptr, 10);
     47  if (delay_ms > 0 && delay_ms <= 20) {
     48    RTC_LOG(LS_INFO) << "SendNackDelay is set to " << delay_ms;
     49    return TimeDelta::Millis(delay_ms);
     50  }
     51  return kDefaultSendNackDelay;
     52 }
     53 }  // namespace
     54 
     55 NackPeriodicProcessor::NackPeriodicProcessor(TimeDelta update_interval)
     56    : update_interval_(update_interval) {}
     57 
     58 NackPeriodicProcessor::~NackPeriodicProcessor() {}
     59 
     60 void NackPeriodicProcessor::RegisterNackModule(NackRequesterBase* module) {
     61  RTC_DCHECK_RUN_ON(&sequence_);
     62  modules_.push_back(module);
     63  if (modules_.size() != 1)
     64    return;
     65  repeating_task_ = RepeatingTaskHandle::DelayedStart(
     66      TaskQueueBase::Current(), update_interval_, [this] {
     67        RTC_DCHECK_RUN_ON(&sequence_);
     68        ProcessNackModules();
     69        return update_interval_;
     70      });
     71 }
     72 
     73 void NackPeriodicProcessor::UnregisterNackModule(NackRequesterBase* module) {
     74  RTC_DCHECK_RUN_ON(&sequence_);
     75  auto it = std::find(modules_.begin(), modules_.end(), module);
     76  RTC_DCHECK(it != modules_.end());
     77  modules_.erase(it);
     78  if (modules_.empty())
     79    repeating_task_.Stop();
     80 }
     81 
     82 void NackPeriodicProcessor::ProcessNackModules() {
     83  RTC_DCHECK_RUN_ON(&sequence_);
     84  for (NackRequesterBase* module : modules_)
     85    module->ProcessNacks();
     86 }
     87 
     88 ScopedNackPeriodicProcessorRegistration::
     89    ScopedNackPeriodicProcessorRegistration(NackRequesterBase* module,
     90                                            NackPeriodicProcessor* processor)
     91    : module_(module), processor_(processor) {
     92  processor_->RegisterNackModule(module_);
     93 }
     94 
     95 ScopedNackPeriodicProcessorRegistration::
     96    ~ScopedNackPeriodicProcessorRegistration() {
     97  processor_->UnregisterNackModule(module_);
     98 }
     99 
    100 NackRequester::NackInfo::NackInfo()
    101    : seq_num(0),
    102      send_at_seq_num(0),
    103      created_at_time(Timestamp::MinusInfinity()),
    104      sent_at_time(Timestamp::MinusInfinity()),
    105      retries(0) {}
    106 
    107 NackRequester::NackInfo::NackInfo(uint16_t seq_num,
    108                                  uint16_t send_at_seq_num,
    109                                  Timestamp created_at_time)
    110    : seq_num(seq_num),
    111      send_at_seq_num(send_at_seq_num),
    112      created_at_time(created_at_time),
    113      sent_at_time(Timestamp::MinusInfinity()),
    114      retries(0) {}
    115 
    116 NackRequester::NackRequester(TaskQueueBase* current_queue,
    117                             NackPeriodicProcessor* periodic_processor,
    118                             Clock* clock,
    119                             NackSender* nack_sender,
    120                             KeyFrameRequestSender* keyframe_request_sender,
    121                             const FieldTrialsView& field_trials)
    122    : worker_thread_(current_queue),
    123      clock_(clock),
    124      nack_sender_(nack_sender),
    125      keyframe_request_sender_(keyframe_request_sender),
    126      reordering_histogram_(kNumReorderingBuckets, kMaxReorderedPackets),
    127      initialized_(false),
    128      rtt_(kDefaultRtt),
    129      newest_seq_num_(0),
    130      send_nack_delay_(GetSendNackDelay(field_trials)),
    131      processor_registration_(this, periodic_processor) {
    132  RTC_DCHECK(clock_);
    133  RTC_DCHECK(nack_sender_);
    134  RTC_DCHECK(keyframe_request_sender_);
    135  RTC_DCHECK(worker_thread_);
    136  RTC_DCHECK(worker_thread_->IsCurrent());
    137 }
    138 
    139 NackRequester::~NackRequester() {
    140  RTC_DCHECK_RUN_ON(worker_thread_);
    141 }
    142 
    143 void NackRequester::ProcessNacks() {
    144  RTC_DCHECK_RUN_ON(worker_thread_);
    145  std::vector<uint16_t> nack_batch = GetNackBatch(kTimeOnly);
    146  if (!nack_batch.empty()) {
    147    // This batch of NACKs is triggered externally; there is no external
    148    // initiator who can batch them with other feedback messages.
    149    nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false);
    150  }
    151 }
    152 
    153 int NackRequester::OnReceivedPacket(uint16_t seq_num) {
    154  RTC_DCHECK_RUN_ON(worker_thread_);
    155  return OnReceivedPacket(seq_num, false);
    156 }
    157 
    158 int NackRequester::OnReceivedPacket(uint16_t seq_num, bool is_recovered) {
    159  RTC_DCHECK_RUN_ON(worker_thread_);
    160  // TODO(philipel): When the packet includes information whether it is
    161  //                 retransmitted or not, use that value instead. For
    162  //                 now set it to true, which will cause the reordering
    163  //                 statistics to never be updated.
    164  bool is_retransmitted = true;
    165 
    166  if (!initialized_) {
    167    newest_seq_num_ = seq_num;
    168    initialized_ = true;
    169    return 0;
    170  }
    171 
    172  // Since the `newest_seq_num_` is a packet we have actually received we know
    173  // that packet has never been Nacked.
    174  if (seq_num == newest_seq_num_)
    175    return 0;
    176 
    177  if (AheadOf(newest_seq_num_, seq_num)) {
    178    // An out of order packet has been received.
    179    auto nack_list_it = nack_list_.find(seq_num);
    180    int nacks_sent_for_packet = 0;
    181    if (nack_list_it != nack_list_.end()) {
    182      nacks_sent_for_packet = nack_list_it->second.retries;
    183      nack_list_.erase(nack_list_it);
    184    }
    185    if (!is_retransmitted)
    186      UpdateReorderingStatistics(seq_num);
    187    return nacks_sent_for_packet;
    188  }
    189 
    190  if (is_recovered) {
    191    recovered_list_.insert(seq_num);
    192 
    193    // Remove old ones so we don't accumulate recovered packets.
    194    auto it = recovered_list_.lower_bound(seq_num - kMaxPacketAge);
    195    if (it != recovered_list_.begin())
    196      recovered_list_.erase(recovered_list_.begin(), it);
    197 
    198    // Do not send nack for packets recovered by FEC or RTX.
    199    return 0;
    200  }
    201 
    202  AddPacketsToNack(newest_seq_num_ + 1, seq_num);
    203  newest_seq_num_ = seq_num;
    204 
    205  // Are there any nacks that are waiting for this seq_num.
    206  std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly);
    207  if (!nack_batch.empty()) {
    208    // This batch of NACKs is triggered externally; the initiator can
    209    // batch them with other feedback messages.
    210    nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/true);
    211  }
    212 
    213  return 0;
    214 }
    215 
    216 void NackRequester::ClearUpTo(uint16_t seq_num) {
    217  // TODO(bugs.webrtc.org/11993): This method is actually called on the worker
    218  // thread even though the caller stack to this call passes thread checkers
    219  // indicating they belong to the network thread. The inline execution below
    220  // needs to be posted to the worker thread if callers migrate to the network
    221  // thread.
    222  RTC_DCHECK_RUN_ON(worker_thread_);
    223  nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num));
    224  recovered_list_.erase(recovered_list_.begin(),
    225                        recovered_list_.lower_bound(seq_num));
    226 }
    227 
    228 void NackRequester::UpdateRtt(int64_t rtt_ms) {
    229  RTC_DCHECK_RUN_ON(worker_thread_);
    230  rtt_ = TimeDelta::Millis(rtt_ms);
    231 }
    232 
    233 void NackRequester::AddPacketsToNack(uint16_t seq_num_start,
    234                                     uint16_t seq_num_end) {
    235  // Called on worker_thread_.
    236  // Remove old packets.
    237  auto it = nack_list_.lower_bound(seq_num_end - kMaxPacketAge);
    238  nack_list_.erase(nack_list_.begin(), it);
    239 
    240  uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end);
    241  if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
    242    nack_list_.clear();
    243    RTC_LOG(LS_WARNING) << "NACK list full, clearing NACK"
    244                           " list and requesting keyframe.";
    245    keyframe_request_sender_->RequestKeyFrame();
    246    return;
    247  }
    248 
    249  for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) {
    250    // Do not send nack for packets that are already recovered by FEC or RTX
    251    if (recovered_list_.find(seq_num) != recovered_list_.end())
    252      continue;
    253    NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5),
    254                       clock_->CurrentTime());
    255    RTC_DCHECK(nack_list_.find(seq_num) == nack_list_.end());
    256    nack_list_[seq_num] = nack_info;
    257  }
    258 }
    259 
    260 std::vector<uint16_t> NackRequester::GetNackBatch(NackFilterOptions options) {
    261  // Called on worker_thread_.
    262 
    263  bool consider_seq_num = options != kTimeOnly;
    264  bool consider_timestamp = options != kSeqNumOnly;
    265  Timestamp now = clock_->CurrentTime();
    266  std::vector<uint16_t> nack_batch;
    267  auto it = nack_list_.begin();
    268  while (it != nack_list_.end()) {
    269    bool delay_timed_out = now - it->second.created_at_time >= send_nack_delay_;
    270    bool nack_on_rtt_passed = now - it->second.sent_at_time >= rtt_;
    271    bool nack_on_seq_num_passed =
    272        it->second.sent_at_time.IsInfinite() &&
    273        AheadOrAt(newest_seq_num_, it->second.send_at_seq_num);
    274    if (delay_timed_out && ((consider_seq_num && nack_on_seq_num_passed) ||
    275                            (consider_timestamp && nack_on_rtt_passed))) {
    276      nack_batch.emplace_back(it->second.seq_num);
    277      ++it->second.retries;
    278      it->second.sent_at_time = now;
    279      if (it->second.retries >= kMaxNackRetries) {
    280        RTC_LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
    281                            << " removed from NACK list due to max retries.";
    282        it = nack_list_.erase(it);
    283      } else {
    284        ++it;
    285      }
    286      continue;
    287    }
    288    ++it;
    289  }
    290  return nack_batch;
    291 }
    292 
    293 void NackRequester::UpdateReorderingStatistics(uint16_t seq_num) {
    294  // Running on worker_thread_.
    295  RTC_DCHECK(AheadOf(newest_seq_num_, seq_num));
    296  uint16_t diff = ReverseDiff(newest_seq_num_, seq_num);
    297  reordering_histogram_.Add(diff);
    298 }
    299 
    300 int NackRequester::WaitNumberOfPackets(float probability) const {
    301  // Called on worker_thread_;
    302  if (reordering_histogram_.NumValues() == 0)
    303    return 0;
    304  return reordering_histogram_.InverseCdf(probability);
    305 }
    306 
    307 }  // namespace webrtc