tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

prioritized_packet_queue.cc (17232B)


      1 /*
      2 *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
      3 *
      4 *  Use of this source code is governed by a BSD-style license
      5 *  that can be found in the LICENSE file in the root of the source
      6 *  tree. An additional intellectual property rights grant can be found
      7 *  in the file PATENTS.  All contributing project authors may
      8 *  be found in the AUTHORS file in the root of the source tree.
      9 */
     10 
     11 #include "modules/pacing/prioritized_packet_queue.h"
     12 
     13 #include <algorithm>
     14 #include <array>
     15 #include <cstddef>
     16 #include <cstdint>
     17 #include <deque>
     18 #include <memory>
     19 #include <optional>
     20 #include <utility>
     21 
     22 #include "absl/container/inlined_vector.h"
     23 #include "api/units/data_size.h"
     24 #include "api/units/time_delta.h"
     25 #include "api/units/timestamp.h"
     26 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
     27 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
     28 #include "rtc_base/checks.h"
     29 #include "rtc_base/logging.h"
     30 
     31 namespace webrtc {
     32 namespace {
     33 
     34 constexpr int kAudioPrioLevel = 0;
     35 
     36 int GetPriorityForType(
     37    RtpPacketMediaType type,
     38    std::optional<RtpPacketToSend::OriginalType> original_type) {
     39  // Lower number takes priority over higher.
     40  switch (type) {
     41    case RtpPacketMediaType::kAudio:
     42      // Audio is always prioritized over other packet types.
     43      return kAudioPrioLevel;
     44    case RtpPacketMediaType::kRetransmission:
     45      // Send retransmissions before new media. If original_type is set, audio
     46      // retransmission is prioritized more than video retransmission.
     47      if (original_type == RtpPacketToSend::OriginalType::kVideo) {
     48        return kAudioPrioLevel + 2;
     49      }
     50      return kAudioPrioLevel + 1;
     51    case RtpPacketMediaType::kVideo:
     52    case RtpPacketMediaType::kForwardErrorCorrection:
     53      // Video has "normal" priority, in the old speak.
     54      // Send redundancy concurrently to video. If it is delayed it might have a
     55      // lower chance of being useful.
     56      return kAudioPrioLevel + 3;
     57    case RtpPacketMediaType::kPadding:
     58      // Packets that are in themselves likely useless, only sent to keep the
     59      // BWE high.
     60      return kAudioPrioLevel + 4;
     61  }
     62  RTC_CHECK_NOTREACHED();
     63 }
     64 
     65 }  // namespace
     66 
     67 absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
     68 PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) {
     69  absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
     70      ttl_per_prio(kNumPriorityLevels, TimeDelta::PlusInfinity());
     71  ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
     72                                  RtpPacketToSend::OriginalType::kAudio)] =
     73      packet_queue_ttl.audio_retransmission;
     74  ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
     75                                  RtpPacketToSend::OriginalType::kVideo)] =
     76      packet_queue_ttl.video_retransmission;
     77  ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, std::nullopt)] =
     78      packet_queue_ttl.video;
     79  return ttl_per_prio;
     80 }
     81 
     82 DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
     83  return DataSize::Bytes(packet->payload_size() + packet->padding_size());
     84 }
     85 
     86 PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time)
     87    : last_enqueue_time_(creation_time), num_keyframe_packets_(0) {}
     88 
     89 bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
     90                                                        int priority_level) {
     91  if (packet.packet->is_key_frame()) {
     92    ++num_keyframe_packets_;
     93  }
     94  bool first_packet_at_level = packets_[priority_level].empty();
     95  packets_[priority_level].push_back(std::move(packet));
     96  return first_packet_at_level;
     97 }
     98 
     99 PrioritizedPacketQueue::QueuedPacket
    100 PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) {
    101  RTC_DCHECK(!packets_[priority_level].empty());
    102  QueuedPacket packet = std::move(packets_[priority_level].front());
    103  packets_[priority_level].pop_front();
    104  if (packet.packet->is_key_frame()) {
    105    RTC_DCHECK_GT(num_keyframe_packets_, 0);
    106    --num_keyframe_packets_;
    107  }
    108  return packet;
    109 }
    110 
    111 bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio(
    112    int priority_level) const {
    113  return !packets_[priority_level].empty();
    114 }
    115 
    116 bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const {
    117  for (const std::deque<QueuedPacket>& queue : packets_) {
    118    if (!queue.empty()) {
    119      return false;
    120    }
    121  }
    122  return true;
    123 }
    124 
    125 Timestamp PrioritizedPacketQueue::StreamQueue::LeadingPacketEnqueueTime(
    126    int priority_level) const {
    127  RTC_DCHECK(!packets_[priority_level].empty());
    128  return packets_[priority_level].begin()->enqueue_time;
    129 }
    130 
    131 Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
    132  return last_enqueue_time_;
    133 }
    134 
    135 std::array<std::deque<PrioritizedPacketQueue::QueuedPacket>,
    136           PrioritizedPacketQueue::kNumPriorityLevels>
    137 PrioritizedPacketQueue::StreamQueue::DequeueAll() {
    138  std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio;
    139  for (int i = 0; i < kNumPriorityLevels; ++i) {
    140    packets_by_prio[i].swap(packets_[i]);
    141  }
    142  num_keyframe_packets_ = 0;
    143  return packets_by_prio;
    144 }
    145 
    146 PrioritizedPacketQueue::PrioritizedPacketQueue(
    147    Timestamp creation_time,
    148    bool prioritize_audio_retransmission,
    149    PacketQueueTTL packet_queue_ttl)
    150    : prioritize_audio_retransmission_(prioritize_audio_retransmission),
    151      time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)),
    152      queue_time_sum_(TimeDelta::Zero()),
    153      pause_time_sum_(TimeDelta::Zero()),
    154      size_packets_(0),
    155      size_packets_per_media_type_({}),
    156      size_payload_(DataSize::Zero()),
    157      last_update_time_(creation_time),
    158      paused_(false),
    159      last_culling_time_(creation_time),
    160      top_active_prio_level_(-1) {}
    161 
    162 void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
    163                                  std::unique_ptr<RtpPacketToSend> packet) {
    164  StreamQueue* stream_queue;
    165  auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr);
    166  if (inserted) {
    167    it->second = std::make_unique<StreamQueue>(enqueue_time);
    168  }
    169  stream_queue = it->second.get();
    170 
    171  auto enqueue_time_iterator =
    172      enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
    173  RTC_DCHECK(packet->packet_type().has_value());
    174  RtpPacketMediaType packet_type = packet->packet_type().value();
    175  int prio_level =
    176      GetPriorityForType(packet_type, prioritize_audio_retransmission_
    177                                          ? packet->original_packet_type()
    178                                          : std::nullopt);
    179  PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time);
    180  RTC_DCHECK_GE(prio_level, 0);
    181  RTC_DCHECK_LT(prio_level, kNumPriorityLevels);
    182  QueuedPacket queued_packed = {.packet = std::move(packet),
    183                                .enqueue_time = enqueue_time,
    184                                .enqueue_time_iterator = enqueue_time_iterator};
    185  // In order to figure out how much time a packet has spent in the queue
    186  // while not in a paused state, we subtract the total amount of time the
    187  // queue has been paused so far, and when the packet is popped we subtract
    188  // the total amount of time the queue has been paused at that moment. This
    189  // way we subtract the total amount of time the packet has spent in the
    190  // queue while in a paused state.
    191  UpdateAverageQueueTime(enqueue_time);
    192  queued_packed.enqueue_time -= pause_time_sum_;
    193  ++size_packets_;
    194  ++size_packets_per_media_type_[static_cast<size_t>(packet_type)];
    195  size_payload_ += queued_packed.PacketSize();
    196 
    197  if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) {
    198    // Number packets at `prio_level` for this steam is now non-zero.
    199    streams_by_prio_[prio_level].push_back(stream_queue);
    200  }
    201  if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) {
    202    top_active_prio_level_ = prio_level;
    203  }
    204 
    205  static constexpr TimeDelta kTimeout = TimeDelta::Millis(500);
    206  if (enqueue_time - last_culling_time_ > kTimeout) {
    207    for (auto stream_it = streams_.begin(); stream_it != streams_.end();) {
    208      if (stream_it->second->IsEmpty() &&
    209          stream_it->second->LastEnqueueTime() + kTimeout < enqueue_time) {
    210        streams_.erase(stream_it++);
    211      } else {
    212        ++stream_it;
    213      }
    214    }
    215    last_culling_time_ = enqueue_time;
    216  }
    217 }
    218 
    219 std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
    220  if (size_packets_ == 0) {
    221    return nullptr;
    222  }
    223 
    224  RTC_DCHECK_GE(top_active_prio_level_, 0);
    225  StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
    226  QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_);
    227  DequeuePacketInternal(packet);
    228 
    229  // Remove StreamQueue from head of fifo-queue for this prio level, and
    230  // and add it to the end if it still has packets.
    231  streams_by_prio_[top_active_prio_level_].pop_front();
    232  if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
    233    streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
    234  } else {
    235    MaybeUpdateTopPrioLevel();
    236  }
    237 
    238  return std::move(packet.packet);
    239 }
    240 
    241 int PrioritizedPacketQueue::SizeInPackets() const {
    242  return size_packets_;
    243 }
    244 
    245 DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const {
    246  return size_payload_;
    247 }
    248 
    249 bool PrioritizedPacketQueue::Empty() const {
    250  return size_packets_ == 0;
    251 }
    252 
    253 const std::array<int, kNumMediaTypes>&
    254 PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
    255  return size_packets_per_media_type_;
    256 }
    257 
    258 Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
    259    RtpPacketMediaType type) const {
    260  RTC_DCHECK(type != RtpPacketMediaType::kRetransmission);
    261  const int priority_level = GetPriorityForType(type, std::nullopt);
    262  if (streams_by_prio_[priority_level].empty()) {
    263    return Timestamp::MinusInfinity();
    264  }
    265  return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
    266      priority_level);
    267 }
    268 
    269 Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission()
    270    const {
    271  if (!prioritize_audio_retransmission_) {
    272    const int priority_level =
    273        GetPriorityForType(RtpPacketMediaType::kRetransmission, std::nullopt);
    274    if (streams_by_prio_[priority_level].empty()) {
    275      return Timestamp::PlusInfinity();
    276    }
    277    return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
    278        priority_level);
    279  }
    280  const int audio_priority_level =
    281      GetPriorityForType(RtpPacketMediaType::kRetransmission,
    282                         RtpPacketToSend::OriginalType::kAudio);
    283  const int video_priority_level =
    284      GetPriorityForType(RtpPacketMediaType::kRetransmission,
    285                         RtpPacketToSend::OriginalType::kVideo);
    286 
    287  Timestamp next_audio =
    288      streams_by_prio_[audio_priority_level].empty()
    289          ? Timestamp::PlusInfinity()
    290          : streams_by_prio_[audio_priority_level]
    291                .front()
    292                ->LeadingPacketEnqueueTime(audio_priority_level);
    293  Timestamp next_video =
    294      streams_by_prio_[video_priority_level].empty()
    295          ? Timestamp::PlusInfinity()
    296          : streams_by_prio_[video_priority_level]
    297                .front()
    298                ->LeadingPacketEnqueueTime(video_priority_level);
    299  return std::min(next_audio, next_video);
    300 }
    301 
    302 Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
    303  return enqueue_times_.empty() ? Timestamp::MinusInfinity()
    304                                : enqueue_times_.front();
    305 }
    306 
    307 TimeDelta PrioritizedPacketQueue::AverageQueueTime() const {
    308  if (size_packets_ == 0) {
    309    return TimeDelta::Zero();
    310  }
    311  return queue_time_sum_ / size_packets_;
    312 }
    313 
    314 void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) {
    315  RTC_CHECK_GE(now, last_update_time_);
    316  if (now == last_update_time_) {
    317    return;
    318  }
    319 
    320  TimeDelta delta = now - last_update_time_;
    321 
    322  if (paused_) {
    323    pause_time_sum_ += delta;
    324  } else {
    325    queue_time_sum_ += delta * size_packets_;
    326  }
    327 
    328  last_update_time_ = now;
    329 }
    330 
    331 void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
    332  UpdateAverageQueueTime(now);
    333  paused_ = paused;
    334 }
    335 
    336 void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
    337  auto kv = streams_.find(ssrc);
    338  if (kv != streams_.end()) {
    339    // Dequeue all packets from the queue for this SSRC.
    340    StreamQueue& queue = *kv->second;
    341    std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio =
    342        queue.DequeueAll();
    343    for (int i = 0; i < kNumPriorityLevels; ++i) {
    344      std::deque<QueuedPacket>& packet_queue = packets_by_prio[i];
    345      if (packet_queue.empty()) {
    346        continue;
    347      }
    348 
    349      // First erase all packets at this prio level.
    350      while (!packet_queue.empty()) {
    351        QueuedPacket packet = std::move(packet_queue.front());
    352        packet_queue.pop_front();
    353        DequeuePacketInternal(packet);
    354      }
    355 
    356      // Next, deregister this `StreamQueue` from the round-robin tables.
    357      RTC_DCHECK(!streams_by_prio_[i].empty());
    358      if (streams_by_prio_[i].size() == 1) {
    359        // This is the last and only queue that had packets for this prio level.
    360        // Update the global top prio level if neccessary.
    361        RTC_DCHECK(streams_by_prio_[i].front() == &queue);
    362        streams_by_prio_[i].pop_front();
    363      } else {
    364        // More than stream had packets at this prio level, filter this one out.
    365        std::deque<StreamQueue*> filtered_queue;
    366        for (StreamQueue* queue_ptr : streams_by_prio_[i]) {
    367          if (queue_ptr != &queue) {
    368            filtered_queue.push_back(queue_ptr);
    369          }
    370        }
    371        streams_by_prio_[i].swap(filtered_queue);
    372      }
    373    }
    374  }
    375  MaybeUpdateTopPrioLevel();
    376 }
    377 
    378 bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const {
    379  auto it = streams_.find(ssrc);
    380  if (it != streams_.end()) {
    381    return it->second->has_keyframe_packets();
    382  }
    383  return false;
    384 }
    385 
    386 void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
    387  --size_packets_;
    388  RTC_DCHECK(packet.packet->packet_type().has_value());
    389  RtpPacketMediaType packet_type = packet.packet->packet_type().value();
    390  --size_packets_per_media_type_[static_cast<size_t>(packet_type)];
    391  RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
    392                0);
    393  size_payload_ -= packet.PacketSize();
    394 
    395  // Calculate the total amount of time spent by this packet in the queue
    396  // while in a non-paused state. Note that the `pause_time_sum_ms_` was
    397  // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
    398  // by subtracting it now we effectively remove the time spent in in the
    399  // queue while in a paused state.
    400  TimeDelta time_in_non_paused_state =
    401      last_update_time_ - packet.enqueue_time - pause_time_sum_;
    402  queue_time_sum_ -= time_in_non_paused_state;
    403 
    404  // Set the time spent in the send queue, which is the per-packet equivalent of
    405  // totalPacketSendDelay. The notion of being paused is an implementation
    406  // detail that we do not want to expose, so it makes sense to report the
    407  // metric excluding the pause time. This also avoids spikes in the metric.
    408  // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
    409  packet.packet->set_time_in_send_queue(time_in_non_paused_state);
    410 
    411  RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
    412 
    413  RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
    414  enqueue_times_.erase(packet.enqueue_time_iterator);
    415 }
    416 
    417 void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
    418  if (top_active_prio_level_ != -1 &&
    419      !streams_by_prio_[top_active_prio_level_].empty()) {
    420    return;
    421  }
    422  // No stream queues have packets at top_active_prio_level_, find top priority
    423  // that is not empty.
    424  for (int i = 0; i < kNumPriorityLevels; ++i) {
    425    PurgeOldPacketsAtPriorityLevel(i, last_update_time_);
    426    if (!streams_by_prio_[i].empty()) {
    427      top_active_prio_level_ = i;
    428      break;
    429    }
    430  }
    431  if (size_packets_ == 0) {
    432    // There are no packets left to send. Last packet may have been purged. Prio
    433    // will change when a new packet is pushed.
    434    top_active_prio_level_ = -1;
    435  }
    436 }
    437 
    438 void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level,
    439                                                            Timestamp now) {
    440  RTC_DCHECK(prio_level >= 0 && prio_level < kNumPriorityLevels);
    441  TimeDelta time_to_live = time_to_live_per_prio_[prio_level];
    442  if (time_to_live.IsInfinite()) {
    443    return;
    444  }
    445 
    446  std::deque<StreamQueue*>& queues = streams_by_prio_[prio_level];
    447  auto iter = queues.begin();
    448  while (iter != queues.end()) {
    449    StreamQueue* queue_ptr = *iter;
    450    while (queue_ptr->HasPacketsAtPrio(prio_level) &&
    451           (now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) >
    452               time_to_live) {
    453      QueuedPacket packet = queue_ptr->DequeuePacket(prio_level);
    454      RTC_LOG(LS_INFO) << "Dropping old packet on SSRC: "
    455                       << packet.packet->Ssrc()
    456                       << " seq:" << packet.packet->SequenceNumber()
    457                       << " time in queue:" << (now - packet.enqueue_time).ms()
    458                       << " ms";
    459      DequeuePacketInternal(packet);
    460    }
    461    if (!queue_ptr->HasPacketsAtPrio(prio_level)) {
    462      iter = queues.erase(iter);
    463    } else {
    464      ++iter;
    465    }
    466  }
    467 }
    468 
    469 }  // namespace webrtc