prioritized_packet_queue.cc (17232B)
1 /* 2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/pacing/prioritized_packet_queue.h" 12 13 #include <algorithm> 14 #include <array> 15 #include <cstddef> 16 #include <cstdint> 17 #include <deque> 18 #include <memory> 19 #include <optional> 20 #include <utility> 21 22 #include "absl/container/inlined_vector.h" 23 #include "api/units/data_size.h" 24 #include "api/units/time_delta.h" 25 #include "api/units/timestamp.h" 26 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 27 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" 28 #include "rtc_base/checks.h" 29 #include "rtc_base/logging.h" 30 31 namespace webrtc { 32 namespace { 33 34 constexpr int kAudioPrioLevel = 0; 35 36 int GetPriorityForType( 37 RtpPacketMediaType type, 38 std::optional<RtpPacketToSend::OriginalType> original_type) { 39 // Lower number takes priority over higher. 40 switch (type) { 41 case RtpPacketMediaType::kAudio: 42 // Audio is always prioritized over other packet types. 43 return kAudioPrioLevel; 44 case RtpPacketMediaType::kRetransmission: 45 // Send retransmissions before new media. If original_type is set, audio 46 // retransmission is prioritized more than video retransmission. 47 if (original_type == RtpPacketToSend::OriginalType::kVideo) { 48 return kAudioPrioLevel + 2; 49 } 50 return kAudioPrioLevel + 1; 51 case RtpPacketMediaType::kVideo: 52 case RtpPacketMediaType::kForwardErrorCorrection: 53 // Video has "normal" priority, in the old speak. 54 // Send redundancy concurrently to video. If it is delayed it might have a 55 // lower chance of being useful. 56 return kAudioPrioLevel + 3; 57 case RtpPacketMediaType::kPadding: 58 // Packets that are in themselves likely useless, only sent to keep the 59 // BWE high. 60 return kAudioPrioLevel + 4; 61 } 62 RTC_CHECK_NOTREACHED(); 63 } 64 65 } // namespace 66 67 absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels> 68 PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) { 69 absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels> 70 ttl_per_prio(kNumPriorityLevels, TimeDelta::PlusInfinity()); 71 ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission, 72 RtpPacketToSend::OriginalType::kAudio)] = 73 packet_queue_ttl.audio_retransmission; 74 ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission, 75 RtpPacketToSend::OriginalType::kVideo)] = 76 packet_queue_ttl.video_retransmission; 77 ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, std::nullopt)] = 78 packet_queue_ttl.video; 79 return ttl_per_prio; 80 } 81 82 DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const { 83 return DataSize::Bytes(packet->payload_size() + packet->padding_size()); 84 } 85 86 PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time) 87 : last_enqueue_time_(creation_time), num_keyframe_packets_(0) {} 88 89 bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet, 90 int priority_level) { 91 if (packet.packet->is_key_frame()) { 92 ++num_keyframe_packets_; 93 } 94 bool first_packet_at_level = packets_[priority_level].empty(); 95 packets_[priority_level].push_back(std::move(packet)); 96 return first_packet_at_level; 97 } 98 99 PrioritizedPacketQueue::QueuedPacket 100 PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) { 101 RTC_DCHECK(!packets_[priority_level].empty()); 102 QueuedPacket packet = std::move(packets_[priority_level].front()); 103 packets_[priority_level].pop_front(); 104 if (packet.packet->is_key_frame()) { 105 RTC_DCHECK_GT(num_keyframe_packets_, 0); 106 --num_keyframe_packets_; 107 } 108 return packet; 109 } 110 111 bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio( 112 int priority_level) const { 113 return !packets_[priority_level].empty(); 114 } 115 116 bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const { 117 for (const std::deque<QueuedPacket>& queue : packets_) { 118 if (!queue.empty()) { 119 return false; 120 } 121 } 122 return true; 123 } 124 125 Timestamp PrioritizedPacketQueue::StreamQueue::LeadingPacketEnqueueTime( 126 int priority_level) const { 127 RTC_DCHECK(!packets_[priority_level].empty()); 128 return packets_[priority_level].begin()->enqueue_time; 129 } 130 131 Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const { 132 return last_enqueue_time_; 133 } 134 135 std::array<std::deque<PrioritizedPacketQueue::QueuedPacket>, 136 PrioritizedPacketQueue::kNumPriorityLevels> 137 PrioritizedPacketQueue::StreamQueue::DequeueAll() { 138 std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio; 139 for (int i = 0; i < kNumPriorityLevels; ++i) { 140 packets_by_prio[i].swap(packets_[i]); 141 } 142 num_keyframe_packets_ = 0; 143 return packets_by_prio; 144 } 145 146 PrioritizedPacketQueue::PrioritizedPacketQueue( 147 Timestamp creation_time, 148 bool prioritize_audio_retransmission, 149 PacketQueueTTL packet_queue_ttl) 150 : prioritize_audio_retransmission_(prioritize_audio_retransmission), 151 time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)), 152 queue_time_sum_(TimeDelta::Zero()), 153 pause_time_sum_(TimeDelta::Zero()), 154 size_packets_(0), 155 size_packets_per_media_type_({}), 156 size_payload_(DataSize::Zero()), 157 last_update_time_(creation_time), 158 paused_(false), 159 last_culling_time_(creation_time), 160 top_active_prio_level_(-1) {} 161 162 void PrioritizedPacketQueue::Push(Timestamp enqueue_time, 163 std::unique_ptr<RtpPacketToSend> packet) { 164 StreamQueue* stream_queue; 165 auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr); 166 if (inserted) { 167 it->second = std::make_unique<StreamQueue>(enqueue_time); 168 } 169 stream_queue = it->second.get(); 170 171 auto enqueue_time_iterator = 172 enqueue_times_.insert(enqueue_times_.end(), enqueue_time); 173 RTC_DCHECK(packet->packet_type().has_value()); 174 RtpPacketMediaType packet_type = packet->packet_type().value(); 175 int prio_level = 176 GetPriorityForType(packet_type, prioritize_audio_retransmission_ 177 ? packet->original_packet_type() 178 : std::nullopt); 179 PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time); 180 RTC_DCHECK_GE(prio_level, 0); 181 RTC_DCHECK_LT(prio_level, kNumPriorityLevels); 182 QueuedPacket queued_packed = {.packet = std::move(packet), 183 .enqueue_time = enqueue_time, 184 .enqueue_time_iterator = enqueue_time_iterator}; 185 // In order to figure out how much time a packet has spent in the queue 186 // while not in a paused state, we subtract the total amount of time the 187 // queue has been paused so far, and when the packet is popped we subtract 188 // the total amount of time the queue has been paused at that moment. This 189 // way we subtract the total amount of time the packet has spent in the 190 // queue while in a paused state. 191 UpdateAverageQueueTime(enqueue_time); 192 queued_packed.enqueue_time -= pause_time_sum_; 193 ++size_packets_; 194 ++size_packets_per_media_type_[static_cast<size_t>(packet_type)]; 195 size_payload_ += queued_packed.PacketSize(); 196 197 if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) { 198 // Number packets at `prio_level` for this steam is now non-zero. 199 streams_by_prio_[prio_level].push_back(stream_queue); 200 } 201 if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) { 202 top_active_prio_level_ = prio_level; 203 } 204 205 static constexpr TimeDelta kTimeout = TimeDelta::Millis(500); 206 if (enqueue_time - last_culling_time_ > kTimeout) { 207 for (auto stream_it = streams_.begin(); stream_it != streams_.end();) { 208 if (stream_it->second->IsEmpty() && 209 stream_it->second->LastEnqueueTime() + kTimeout < enqueue_time) { 210 streams_.erase(stream_it++); 211 } else { 212 ++stream_it; 213 } 214 } 215 last_culling_time_ = enqueue_time; 216 } 217 } 218 219 std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() { 220 if (size_packets_ == 0) { 221 return nullptr; 222 } 223 224 RTC_DCHECK_GE(top_active_prio_level_, 0); 225 StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front(); 226 QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_); 227 DequeuePacketInternal(packet); 228 229 // Remove StreamQueue from head of fifo-queue for this prio level, and 230 // and add it to the end if it still has packets. 231 streams_by_prio_[top_active_prio_level_].pop_front(); 232 if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) { 233 streams_by_prio_[top_active_prio_level_].push_back(&stream_queue); 234 } else { 235 MaybeUpdateTopPrioLevel(); 236 } 237 238 return std::move(packet.packet); 239 } 240 241 int PrioritizedPacketQueue::SizeInPackets() const { 242 return size_packets_; 243 } 244 245 DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const { 246 return size_payload_; 247 } 248 249 bool PrioritizedPacketQueue::Empty() const { 250 return size_packets_ == 0; 251 } 252 253 const std::array<int, kNumMediaTypes>& 254 PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { 255 return size_packets_per_media_type_; 256 } 257 258 Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime( 259 RtpPacketMediaType type) const { 260 RTC_DCHECK(type != RtpPacketMediaType::kRetransmission); 261 const int priority_level = GetPriorityForType(type, std::nullopt); 262 if (streams_by_prio_[priority_level].empty()) { 263 return Timestamp::MinusInfinity(); 264 } 265 return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime( 266 priority_level); 267 } 268 269 Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission() 270 const { 271 if (!prioritize_audio_retransmission_) { 272 const int priority_level = 273 GetPriorityForType(RtpPacketMediaType::kRetransmission, std::nullopt); 274 if (streams_by_prio_[priority_level].empty()) { 275 return Timestamp::PlusInfinity(); 276 } 277 return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime( 278 priority_level); 279 } 280 const int audio_priority_level = 281 GetPriorityForType(RtpPacketMediaType::kRetransmission, 282 RtpPacketToSend::OriginalType::kAudio); 283 const int video_priority_level = 284 GetPriorityForType(RtpPacketMediaType::kRetransmission, 285 RtpPacketToSend::OriginalType::kVideo); 286 287 Timestamp next_audio = 288 streams_by_prio_[audio_priority_level].empty() 289 ? Timestamp::PlusInfinity() 290 : streams_by_prio_[audio_priority_level] 291 .front() 292 ->LeadingPacketEnqueueTime(audio_priority_level); 293 Timestamp next_video = 294 streams_by_prio_[video_priority_level].empty() 295 ? Timestamp::PlusInfinity() 296 : streams_by_prio_[video_priority_level] 297 .front() 298 ->LeadingPacketEnqueueTime(video_priority_level); 299 return std::min(next_audio, next_video); 300 } 301 302 Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const { 303 return enqueue_times_.empty() ? Timestamp::MinusInfinity() 304 : enqueue_times_.front(); 305 } 306 307 TimeDelta PrioritizedPacketQueue::AverageQueueTime() const { 308 if (size_packets_ == 0) { 309 return TimeDelta::Zero(); 310 } 311 return queue_time_sum_ / size_packets_; 312 } 313 314 void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) { 315 RTC_CHECK_GE(now, last_update_time_); 316 if (now == last_update_time_) { 317 return; 318 } 319 320 TimeDelta delta = now - last_update_time_; 321 322 if (paused_) { 323 pause_time_sum_ += delta; 324 } else { 325 queue_time_sum_ += delta * size_packets_; 326 } 327 328 last_update_time_ = now; 329 } 330 331 void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) { 332 UpdateAverageQueueTime(now); 333 paused_ = paused; 334 } 335 336 void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) { 337 auto kv = streams_.find(ssrc); 338 if (kv != streams_.end()) { 339 // Dequeue all packets from the queue for this SSRC. 340 StreamQueue& queue = *kv->second; 341 std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio = 342 queue.DequeueAll(); 343 for (int i = 0; i < kNumPriorityLevels; ++i) { 344 std::deque<QueuedPacket>& packet_queue = packets_by_prio[i]; 345 if (packet_queue.empty()) { 346 continue; 347 } 348 349 // First erase all packets at this prio level. 350 while (!packet_queue.empty()) { 351 QueuedPacket packet = std::move(packet_queue.front()); 352 packet_queue.pop_front(); 353 DequeuePacketInternal(packet); 354 } 355 356 // Next, deregister this `StreamQueue` from the round-robin tables. 357 RTC_DCHECK(!streams_by_prio_[i].empty()); 358 if (streams_by_prio_[i].size() == 1) { 359 // This is the last and only queue that had packets for this prio level. 360 // Update the global top prio level if neccessary. 361 RTC_DCHECK(streams_by_prio_[i].front() == &queue); 362 streams_by_prio_[i].pop_front(); 363 } else { 364 // More than stream had packets at this prio level, filter this one out. 365 std::deque<StreamQueue*> filtered_queue; 366 for (StreamQueue* queue_ptr : streams_by_prio_[i]) { 367 if (queue_ptr != &queue) { 368 filtered_queue.push_back(queue_ptr); 369 } 370 } 371 streams_by_prio_[i].swap(filtered_queue); 372 } 373 } 374 } 375 MaybeUpdateTopPrioLevel(); 376 } 377 378 bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const { 379 auto it = streams_.find(ssrc); 380 if (it != streams_.end()) { 381 return it->second->has_keyframe_packets(); 382 } 383 return false; 384 } 385 386 void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) { 387 --size_packets_; 388 RTC_DCHECK(packet.packet->packet_type().has_value()); 389 RtpPacketMediaType packet_type = packet.packet->packet_type().value(); 390 --size_packets_per_media_type_[static_cast<size_t>(packet_type)]; 391 RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)], 392 0); 393 size_payload_ -= packet.PacketSize(); 394 395 // Calculate the total amount of time spent by this packet in the queue 396 // while in a non-paused state. Note that the `pause_time_sum_ms_` was 397 // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and 398 // by subtracting it now we effectively remove the time spent in in the 399 // queue while in a paused state. 400 TimeDelta time_in_non_paused_state = 401 last_update_time_ - packet.enqueue_time - pause_time_sum_; 402 queue_time_sum_ -= time_in_non_paused_state; 403 404 // Set the time spent in the send queue, which is the per-packet equivalent of 405 // totalPacketSendDelay. The notion of being paused is an implementation 406 // detail that we do not want to expose, so it makes sense to report the 407 // metric excluding the pause time. This also avoids spikes in the metric. 408 // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay 409 packet.packet->set_time_in_send_queue(time_in_non_paused_state); 410 411 RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero()); 412 413 RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end()); 414 enqueue_times_.erase(packet.enqueue_time_iterator); 415 } 416 417 void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() { 418 if (top_active_prio_level_ != -1 && 419 !streams_by_prio_[top_active_prio_level_].empty()) { 420 return; 421 } 422 // No stream queues have packets at top_active_prio_level_, find top priority 423 // that is not empty. 424 for (int i = 0; i < kNumPriorityLevels; ++i) { 425 PurgeOldPacketsAtPriorityLevel(i, last_update_time_); 426 if (!streams_by_prio_[i].empty()) { 427 top_active_prio_level_ = i; 428 break; 429 } 430 } 431 if (size_packets_ == 0) { 432 // There are no packets left to send. Last packet may have been purged. Prio 433 // will change when a new packet is pushed. 434 top_active_prio_level_ = -1; 435 } 436 } 437 438 void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level, 439 Timestamp now) { 440 RTC_DCHECK(prio_level >= 0 && prio_level < kNumPriorityLevels); 441 TimeDelta time_to_live = time_to_live_per_prio_[prio_level]; 442 if (time_to_live.IsInfinite()) { 443 return; 444 } 445 446 std::deque<StreamQueue*>& queues = streams_by_prio_[prio_level]; 447 auto iter = queues.begin(); 448 while (iter != queues.end()) { 449 StreamQueue* queue_ptr = *iter; 450 while (queue_ptr->HasPacketsAtPrio(prio_level) && 451 (now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) > 452 time_to_live) { 453 QueuedPacket packet = queue_ptr->DequeuePacket(prio_level); 454 RTC_LOG(LS_INFO) << "Dropping old packet on SSRC: " 455 << packet.packet->Ssrc() 456 << " seq:" << packet.packet->SequenceNumber() 457 << " time in queue:" << (now - packet.enqueue_time).ms() 458 << " ms"; 459 DequeuePacketInternal(packet); 460 } 461 if (!queue_ptr->HasPacketsAtPrio(prio_level)) { 462 iter = queues.erase(iter); 463 } else { 464 ++iter; 465 } 466 } 467 } 468 469 } // namespace webrtc