tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

task_queue_paced_sender.cc (10669B)


      1 /*
      2 *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
      3 *
      4 *  Use of this source code is governed by a BSD-style license
      5 *  that can be found in the LICENSE file in the root of the source
      6 *  tree. An additional intellectual property rights grant can be found
      7 *  in the file PATENTS.  All contributing project authors may
      8 *  be found in the AUTHORS file in the root of the source tree.
      9 */
     10 
     11 #include "modules/pacing/task_queue_paced_sender.h"
     12 
     13 #include <algorithm>
     14 #include <cstddef>
     15 #include <cstdint>
     16 #include <memory>
     17 #include <optional>
     18 #include <utility>
     19 #include <vector>
     20 
     21 #include "absl/cleanup/cleanup.h"
     22 #include "api/field_trials_view.h"
     23 #include "api/sequence_checker.h"
     24 #include "api/task_queue/pending_task_safety_flag.h"
     25 #include "api/task_queue/task_queue_base.h"
     26 #include "api/transport/network_types.h"
     27 #include "api/units/data_rate.h"
     28 #include "api/units/data_size.h"
     29 #include "api/units/time_delta.h"
     30 #include "api/units/timestamp.h"
     31 #include "modules/pacing/pacing_controller.h"
     32 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
     33 #include "rtc_base/checks.h"
     34 #include "rtc_base/numerics/exp_filter.h"
     35 #include "rtc_base/trace_event.h"
     36 
     37 namespace webrtc {
     38 
     39 const int TaskQueuePacedSender::kNoPacketHoldback = -1;
     40 
     41 TaskQueuePacedSender::TaskQueuePacedSender(
     42    Clock* clock,
     43    PacingController::PacketSender* packet_sender,
     44    const FieldTrialsView& field_trials,
     45    TimeDelta max_hold_back_window,
     46    int max_hold_back_window_in_packets)
     47    : clock_(clock),
     48      max_hold_back_window_(max_hold_back_window),
     49      max_hold_back_window_in_packets_(max_hold_back_window_in_packets),
     50      pacing_controller_(clock, packet_sender, field_trials),
     51      next_process_time_(Timestamp::MinusInfinity()),
     52      is_started_(false),
     53      is_shutdown_(false),
     54      packet_size_(/*alpha=*/0.95),
     55      include_overhead_(false),
     56      task_queue_(TaskQueueBase::Current()) {
     57  RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
     58 }
     59 
     60 TaskQueuePacedSender::~TaskQueuePacedSender() {
     61  RTC_DCHECK_RUN_ON(task_queue_);
     62  is_shutdown_ = true;
     63 }
     64 
     65 void TaskQueuePacedSender::SetSendBurstInterval(TimeDelta burst_interval) {
     66  RTC_DCHECK_RUN_ON(task_queue_);
     67  pacing_controller_.SetSendBurstInterval(burst_interval);
     68 }
     69 
     70 void TaskQueuePacedSender::SetAllowProbeWithoutMediaPacket(bool allow) {
     71  RTC_DCHECK_RUN_ON(task_queue_);
     72  pacing_controller_.SetAllowProbeWithoutMediaPacket(allow);
     73 }
     74 
     75 void TaskQueuePacedSender::EnsureStarted() {
     76  RTC_DCHECK_RUN_ON(task_queue_);
     77  is_started_ = true;
     78  PostMaybeProcessPackets();
     79 }
     80 
     81 void TaskQueuePacedSender::CreateProbeClusters(
     82    std::vector<ProbeClusterConfig> probe_cluster_configs) {
     83  RTC_DCHECK_RUN_ON(task_queue_);
     84  pacing_controller_.CreateProbeClusters(probe_cluster_configs);
     85 
     86  // Probing should be scheduled regardless of if the queue is empty or not in
     87  // order to be able to BWE probe before media is sent.
     88  task_queue_->PostTask(SafeTask(safety_.flag(), [this]() {
     89    RTC_DCHECK_RUN_ON(task_queue_);
     90    MaybeProcessPackets(Timestamp::MinusInfinity());
     91  }));
     92 }
     93 
     94 void TaskQueuePacedSender::Pause() {
     95  RTC_DCHECK_RUN_ON(task_queue_);
     96  pacing_controller_.Pause();
     97 }
     98 
     99 void TaskQueuePacedSender::Resume() {
    100  RTC_DCHECK_RUN_ON(task_queue_);
    101  pacing_controller_.Resume();
    102  PostMaybeProcessPackets();
    103 }
    104 
    105 void TaskQueuePacedSender::SetCongested(bool congested) {
    106  RTC_DCHECK_RUN_ON(task_queue_);
    107  pacing_controller_.SetCongested(congested);
    108  PostMaybeProcessPackets();
    109 }
    110 
    111 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
    112                                          DataRate padding_rate) {
    113  RTC_DCHECK_RUN_ON(task_queue_);
    114  pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
    115  PostMaybeProcessPackets();
    116 }
    117 
    118 void TaskQueuePacedSender::EnqueuePackets(
    119    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
    120  task_queue_->PostTask(
    121      SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
    122        RTC_DCHECK_RUN_ON(task_queue_);
    123        TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
    124                     "TaskQueuePacedSender::EnqueuePackets");
    125        for (auto& packet : packets) {
    126          TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
    127                       "TaskQueuePacedSender::EnqueuePackets::Loop",
    128                       "sequence_number", packet->SequenceNumber(),
    129                       "rtp_timestamp", packet->Timestamp());
    130 
    131          size_t packet_size = packet->payload_size() + packet->padding_size();
    132          if (include_overhead_) {
    133            packet_size += packet->headers_size();
    134          }
    135          packet_size_.Apply(1, packet_size);
    136          RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
    137          pacing_controller_.EnqueuePacket(std::move(packet));
    138        }
    139        MaybeProcessPackets(Timestamp::MinusInfinity());
    140      }));
    141 }
    142 
    143 void TaskQueuePacedSender::RemovePacketsForSsrc(uint32_t ssrc) {
    144  task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] {
    145    RTC_DCHECK_RUN_ON(task_queue_);
    146    pacing_controller_.RemovePacketsForSsrc(ssrc);
    147    MaybeProcessPackets(Timestamp::MinusInfinity());
    148  }));
    149 }
    150 
    151 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
    152  RTC_DCHECK_RUN_ON(task_queue_);
    153  pacing_controller_.SetAccountForAudioPackets(account_for_audio);
    154  PostMaybeProcessPackets();
    155 }
    156 
    157 void TaskQueuePacedSender::SetIncludeOverhead() {
    158  RTC_DCHECK_RUN_ON(task_queue_);
    159  include_overhead_ = true;
    160  pacing_controller_.SetIncludeOverhead();
    161  PostMaybeProcessPackets();
    162 }
    163 
    164 void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
    165  RTC_DCHECK_RUN_ON(task_queue_);
    166  pacing_controller_.SetTransportOverhead(overhead_per_packet);
    167  PostMaybeProcessPackets();
    168 }
    169 
    170 void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
    171  RTC_DCHECK_RUN_ON(task_queue_);
    172  pacing_controller_.SetQueueTimeLimit(limit);
    173  PostMaybeProcessPackets();
    174 }
    175 
    176 TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
    177  return GetStats().expected_queue_time;
    178 }
    179 
    180 DataSize TaskQueuePacedSender::QueueSizeData() const {
    181  return GetStats().queue_size;
    182 }
    183 
    184 std::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
    185  return GetStats().first_sent_packet_time;
    186 }
    187 
    188 TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
    189  Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
    190  if (oldest_packet.IsInfinite()) {
    191    return TimeDelta::Zero();
    192  }
    193 
    194  // (webrtc:9716): The clock is not always monotonic.
    195  Timestamp current = clock_->CurrentTime();
    196  if (current < oldest_packet) {
    197    return TimeDelta::Zero();
    198  }
    199 
    200  return current - oldest_packet;
    201 }
    202 
    203 void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
    204  RTC_DCHECK_RUN_ON(task_queue_);
    205  current_stats_ = stats;
    206 }
    207 
    208 void TaskQueuePacedSender::PostMaybeProcessPackets() {
    209  if (pacing_controller_.QueueSizePackets() == 0) {
    210    return;
    211  }
    212  task_queue_->PostTask(SafeTask(safety_.flag(), [this]() {
    213    RTC_DCHECK_RUN_ON(task_queue_);
    214    MaybeProcessPackets(Timestamp::MinusInfinity());
    215  }));
    216 }
    217 
    218 void TaskQueuePacedSender::MaybeProcessPackets(
    219    Timestamp scheduled_process_time) {
    220  RTC_DCHECK_RUN_ON(task_queue_);
    221 
    222  TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
    223               "TaskQueuePacedSender::MaybeProcessPackets");
    224 
    225  if (is_shutdown_ || !is_started_) {
    226    return;
    227  }
    228 
    229  // Protects against re-entry from transport feedback calling into the task
    230  // queue pacer.
    231  RTC_DCHECK(!processing_packets_);
    232  processing_packets_ = true;
    233  absl::Cleanup cleanup = [this] {
    234    RTC_DCHECK_RUN_ON(task_queue_);
    235    processing_packets_ = false;
    236  };
    237 
    238  Timestamp next_send_time = pacing_controller_.NextSendTime();
    239  RTC_DCHECK(next_send_time.IsFinite());
    240  const Timestamp now = clock_->CurrentTime();
    241  TimeDelta early_execute_margin =
    242      pacing_controller_.IsProbing()
    243          ? PacingController::kMaxEarlyProbeProcessing
    244          : TimeDelta::Zero();
    245 
    246  // Process packets and update stats.
    247  while (next_send_time <= now + early_execute_margin) {
    248    pacing_controller_.ProcessPackets();
    249    next_send_time = pacing_controller_.NextSendTime();
    250    RTC_DCHECK(next_send_time.IsFinite());
    251 
    252    // Probing state could change. Get margin after process packets.
    253    early_execute_margin = pacing_controller_.IsProbing()
    254                               ? PacingController::kMaxEarlyProbeProcessing
    255                               : TimeDelta::Zero();
    256  }
    257  UpdateStats();
    258 
    259  // Ignore retired scheduled task, otherwise reset `next_process_time_`.
    260  if (scheduled_process_time.IsFinite()) {
    261    if (scheduled_process_time != next_process_time_) {
    262      return;
    263    }
    264    next_process_time_ = Timestamp::MinusInfinity();
    265  }
    266 
    267  // Do not hold back in probing.
    268  TimeDelta hold_back_window = TimeDelta::Zero();
    269  if (!pacing_controller_.IsProbing()) {
    270    hold_back_window = max_hold_back_window_;
    271    DataRate pacing_rate = pacing_controller_.pacing_rate();
    272    if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
    273        !pacing_rate.IsZero() &&
    274        packet_size_.filtered() != ExpFilter::kValueUndefined) {
    275      TimeDelta avg_packet_send_time =
    276          DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
    277      hold_back_window =
    278          std::min(hold_back_window,
    279                   avg_packet_send_time * max_hold_back_window_in_packets_);
    280    }
    281  }
    282 
    283  // Calculate next process time.
    284  TimeDelta time_to_next_process =
    285      std::max(hold_back_window, next_send_time - now - early_execute_margin);
    286  next_send_time = now + time_to_next_process;
    287 
    288  // If no in flight task or in flight task is later than `next_send_time`,
    289  // schedule a new one. Previous in flight task will be retired.
    290  if (next_process_time_.IsMinusInfinity() ||
    291      next_process_time_ > next_send_time) {
    292    // Prefer low precision if allowed and not probing.
    293    task_queue_->PostDelayedHighPrecisionTask(
    294        SafeTask(
    295            safety_.flag(),
    296            [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
    297        time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
    298    next_process_time_ = next_send_time;
    299  }
    300 }
    301 
    302 void TaskQueuePacedSender::UpdateStats() {
    303  Stats new_stats;
    304  new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
    305  new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
    306  new_stats.oldest_packet_enqueue_time =
    307      pacing_controller_.OldestPacketEnqueueTime();
    308  new_stats.queue_size = pacing_controller_.QueueSizeData();
    309  OnStatsUpdated(new_stats);
    310 }
    311 
    312 TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
    313  RTC_DCHECK_RUN_ON(task_queue_);
    314  return current_stats_;
    315 }
    316 
    317 }  // namespace webrtc