pacing_controller.cc (26878B)
1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/pacing/pacing_controller.h" 12 13 #include <algorithm> 14 #include <array> 15 #include <cstddef> 16 #include <cstdint> 17 #include <memory> 18 #include <optional> 19 #include <utility> 20 #include <vector> 21 22 #include "absl/cleanup/cleanup.h" 23 #include "absl/strings/string_view.h" 24 #include "api/array_view.h" 25 #include "api/field_trials_view.h" 26 #include "api/transport/network_types.h" 27 #include "api/units/data_rate.h" 28 #include "api/units/data_size.h" 29 #include "api/units/time_delta.h" 30 #include "api/units/timestamp.h" 31 #include "modules/pacing/bitrate_prober.h" 32 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 33 #include "rtc_base/checks.h" 34 #include "rtc_base/logging.h" 35 #include "rtc_base/numerics/safe_conversions.h" 36 #include "system_wrappers/include/clock.h" 37 38 namespace webrtc { 39 namespace { 40 constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis(500); 41 // TODO(sprang): Consider dropping this limit. 42 // The maximum debt level, in terms of time, capped when sending packets. 43 constexpr TimeDelta kMaxDebtInTime = TimeDelta::Millis(500); 44 constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2); 45 } // namespace 46 47 const TimeDelta PacingController::kPausedProcessInterval = 48 kCongestedPacketInterval; 49 const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1); 50 const TimeDelta PacingController::kTargetPaddingDuration = TimeDelta::Millis(5); 51 const TimeDelta PacingController::kMaxPaddingReplayDuration = 52 TimeDelta::Millis(50); 53 const TimeDelta PacingController::kMaxEarlyProbeProcessing = 54 TimeDelta::Millis(1); 55 56 PacingController::PacingController(Clock* clock, 57 PacketSender* packet_sender, 58 const FieldTrialsView& field_trials, 59 Configuration configuration) 60 : clock_(clock), 61 packet_sender_(packet_sender), 62 drain_large_queues_(configuration.drain_large_queues && 63 !field_trials.IsDisabled("WebRTC-Pacer-DrainQueue")), 64 send_padding_if_silent_( 65 field_trials.IsEnabled("WebRTC-Pacer-PadInSilence")), 66 pace_audio_(field_trials.IsEnabled("WebRTC-Pacer-BlockAudio")), 67 ignore_transport_overhead_( 68 field_trials.IsEnabled("WebRTC-Pacer-IgnoreTransportOverhead")), 69 fast_retransmissions_( 70 field_trials.IsEnabled("WebRTC-Pacer-FastRetransmissions")), 71 keyframe_flushing_( 72 configuration.keyframe_flushing || 73 field_trials.IsEnabled("WebRTC-Pacer-KeyframeFlushing")), 74 transport_overhead_per_packet_(DataSize::Zero()), 75 send_burst_interval_(configuration.send_burst_interval), 76 last_timestamp_(clock_->CurrentTime()), 77 paused_(false), 78 media_debt_(DataSize::Zero()), 79 padding_debt_(DataSize::Zero()), 80 pacing_rate_(DataRate::Zero()), 81 adjusted_media_rate_(DataRate::Zero()), 82 padding_rate_(DataRate::Zero()), 83 prober_(field_trials), 84 probing_send_failure_(false), 85 last_process_time_(clock->CurrentTime()), 86 last_send_time_(last_process_time_), 87 seen_first_packet_(false), 88 packet_queue_(/*creation_time=*/last_process_time_, 89 configuration.prioritize_audio_retransmission, 90 configuration.packet_queue_ttl), 91 congested_(false), 92 queue_time_limit_(configuration.queue_time_limit), 93 account_for_audio_(false), 94 include_overhead_(false), 95 circuit_breaker_threshold_(1 << 16) { 96 if (!drain_large_queues_) { 97 RTC_LOG(LS_WARNING) << "Pacer queues will not be drained," 98 "pushback experiment must be enabled."; 99 } 100 } 101 102 PacingController::~PacingController() = default; 103 104 void PacingController::CreateProbeClusters( 105 ArrayView<const ProbeClusterConfig> probe_cluster_configs) { 106 for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) { 107 prober_.CreateProbeCluster(probe_cluster_config); 108 } 109 } 110 111 void PacingController::Pause() { 112 if (!paused_) 113 RTC_LOG(LS_INFO) << "PacedSender paused."; 114 paused_ = true; 115 packet_queue_.SetPauseState(true, CurrentTime()); 116 } 117 118 void PacingController::Resume() { 119 if (paused_) 120 RTC_LOG(LS_INFO) << "PacedSender resumed."; 121 paused_ = false; 122 packet_queue_.SetPauseState(false, CurrentTime()); 123 } 124 125 bool PacingController::IsPaused() const { 126 return paused_; 127 } 128 129 void PacingController::SetCongested(bool congested) { 130 if (congested_ && !congested) { 131 UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime())); 132 } 133 congested_ = congested; 134 } 135 136 void PacingController::SetCircuitBreakerThreshold(int num_iterations) { 137 circuit_breaker_threshold_ = num_iterations; 138 } 139 140 void PacingController::RemovePacketsForSsrc(uint32_t ssrc) { 141 packet_queue_.RemovePacketsForSsrc(ssrc); 142 } 143 144 bool PacingController::IsProbing() const { 145 return prober_.is_probing(); 146 } 147 148 Timestamp PacingController::CurrentTime() const { 149 Timestamp time = clock_->CurrentTime(); 150 if (time < last_timestamp_) { 151 RTC_LOG(LS_WARNING) 152 << "Non-monotonic clock behavior observed. Previous timestamp: " 153 << last_timestamp_.ms() << ", new timestamp: " << time.ms(); 154 RTC_DCHECK_GE(time, last_timestamp_); 155 time = last_timestamp_; 156 } 157 last_timestamp_ = time; 158 return time; 159 } 160 161 void PacingController::SetProbingEnabled(bool enabled) { 162 RTC_CHECK(!seen_first_packet_); 163 prober_.SetEnabled(enabled); 164 } 165 166 void PacingController::SetPacingRates(DataRate pacing_rate, 167 DataRate padding_rate) { 168 RTC_CHECK_GT(pacing_rate, DataRate::Zero()); 169 RTC_CHECK_GE(padding_rate, DataRate::Zero()); 170 if (padding_rate > pacing_rate) { 171 RTC_LOG(LS_WARNING) << "Padding rate " << padding_rate.kbps() 172 << "kbps is higher than the pacing rate " 173 << pacing_rate.kbps() << "kbps, capping."; 174 padding_rate = pacing_rate; 175 } 176 177 if (pacing_rate > max_rate || padding_rate > max_rate) { 178 RTC_LOG(LS_WARNING) << "Very high pacing rates ( > " << max_rate.kbps() 179 << " kbps) configured: pacing = " << pacing_rate.kbps() 180 << " kbps, padding = " << padding_rate.kbps() 181 << " kbps."; 182 max_rate = std::max(pacing_rate, padding_rate) * 1.1; 183 } 184 pacing_rate_ = pacing_rate; 185 padding_rate_ = padding_rate; 186 MaybeUpdateMediaRateDueToLongQueue(CurrentTime()); 187 188 RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps=" << pacing_rate_.kbps() 189 << " padding_budget_kbps=" << padding_rate.kbps(); 190 } 191 192 void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) { 193 RTC_DCHECK(pacing_rate_ > DataRate::Zero()) 194 << "SetPacingRate must be called before InsertPacket."; 195 RTC_CHECK(packet->packet_type()); 196 197 if (keyframe_flushing_ && 198 packet->packet_type() == RtpPacketMediaType::kVideo && 199 packet->is_key_frame() && packet->is_first_packet_of_frame() && 200 !packet_queue_.HasKeyframePackets(packet->Ssrc())) { 201 // First packet of a keyframe (and no keyframe packets currently in the 202 // queue). Flush any pending packets currently in the queue for that stream 203 // in order to get the new keyframe out as quickly as possible. 204 packet_queue_.RemovePacketsForSsrc(packet->Ssrc()); 205 std::optional<uint32_t> rtx_ssrc = 206 packet_sender_->GetRtxSsrcForMedia(packet->Ssrc()); 207 if (rtx_ssrc) { 208 packet_queue_.RemovePacketsForSsrc(*rtx_ssrc); 209 } 210 } 211 212 prober_.OnIncomingPacket(DataSize::Bytes(packet->size())); 213 214 const Timestamp now = CurrentTime(); 215 if (packet_queue_.Empty()) { 216 // If queue is empty, we need to "fast-forward" the last process time, 217 // so that we don't use passed time as budget for sending the first new 218 // packet. 219 Timestamp target_process_time = now; 220 Timestamp next_send_time = NextSendTime(); 221 if (next_send_time.IsFinite()) { 222 // There was already a valid planned send time, such as a keep-alive. 223 // Use that as last process time only if it's prior to now. 224 target_process_time = std::min(now, next_send_time); 225 } 226 UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time)); 227 } 228 packet_queue_.Push(now, std::move(packet)); 229 seen_first_packet_ = true; 230 231 // Queue length has increased, check if we need to change the pacing rate. 232 MaybeUpdateMediaRateDueToLongQueue(now); 233 } 234 235 void PacingController::SetAccountForAudioPackets(bool account_for_audio) { 236 account_for_audio_ = account_for_audio; 237 } 238 239 void PacingController::SetIncludeOverhead() { 240 include_overhead_ = true; 241 } 242 243 void PacingController::SetTransportOverhead(DataSize overhead_per_packet) { 244 if (ignore_transport_overhead_) 245 return; 246 transport_overhead_per_packet_ = overhead_per_packet; 247 } 248 249 void PacingController::SetSendBurstInterval(TimeDelta burst_interval) { 250 send_burst_interval_ = burst_interval; 251 } 252 253 void PacingController::SetAllowProbeWithoutMediaPacket(bool allow) { 254 prober_.SetAllowProbeWithoutMediaPacket(allow); 255 } 256 257 TimeDelta PacingController::ExpectedQueueTime() const { 258 RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero()); 259 return QueueSizeData() / adjusted_media_rate_; 260 } 261 262 size_t PacingController::QueueSizePackets() const { 263 return checked_cast<size_t>(packet_queue_.SizeInPackets()); 264 } 265 266 const std::array<int, kNumMediaTypes>& 267 PacingController::SizeInPacketsPerRtpPacketMediaType() const { 268 return packet_queue_.SizeInPacketsPerRtpPacketMediaType(); 269 } 270 271 DataSize PacingController::QueueSizeData() const { 272 DataSize size = packet_queue_.SizeInPayloadBytes(); 273 if (include_overhead_) { 274 size += static_cast<int64_t>(packet_queue_.SizeInPackets()) * 275 transport_overhead_per_packet_; 276 } 277 return size; 278 } 279 280 DataSize PacingController::CurrentBufferLevel() const { 281 return std::max(media_debt_, padding_debt_); 282 } 283 284 std::optional<Timestamp> PacingController::FirstSentPacketTime() const { 285 return first_sent_packet_time_; 286 } 287 288 Timestamp PacingController::OldestPacketEnqueueTime() const { 289 return packet_queue_.OldestEnqueueTime(); 290 } 291 292 TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { 293 // If no previous processing, or last process was "in the future" because of 294 // early probe processing, then there is no elapsed time to add budget for. 295 if (last_process_time_.IsMinusInfinity() || now < last_process_time_) { 296 return TimeDelta::Zero(); 297 } 298 TimeDelta elapsed_time = now - last_process_time_; 299 last_process_time_ = now; 300 if (elapsed_time > kMaxElapsedTime) { 301 RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time 302 << ") longer than expected, limiting to " 303 << kMaxElapsedTime; 304 elapsed_time = kMaxElapsedTime; 305 } 306 return elapsed_time; 307 } 308 309 bool PacingController::ShouldSendKeepalive(Timestamp now) const { 310 if (send_padding_if_silent_ || paused_ || congested_ || !seen_first_packet_) { 311 // We send a padding packet every 500 ms to ensure we won't get stuck in 312 // congested state due to no feedback being received. 313 if (now - last_send_time_ >= kCongestedPacketInterval) { 314 return true; 315 } 316 } 317 return false; 318 } 319 320 Timestamp PacingController::NextSendTime() const { 321 const Timestamp now = CurrentTime(); 322 Timestamp next_send_time = Timestamp::PlusInfinity(); 323 324 if (paused_) { 325 return last_send_time_ + kPausedProcessInterval; 326 } 327 328 // If probing is active, that always takes priority. 329 if (prober_.is_probing() && !probing_send_failure_) { 330 Timestamp probe_time = prober_.NextProbeTime(now); 331 if (!probe_time.IsPlusInfinity()) { 332 return probe_time.IsMinusInfinity() ? now : probe_time; 333 } 334 } 335 336 // If queue contains a packet which should not be paced, its target send time 337 // is the time at which it was enqueued. 338 Timestamp unpaced_send_time = NextUnpacedSendTime(); 339 if (unpaced_send_time.IsFinite()) { 340 return unpaced_send_time; 341 } 342 343 if (congested_ || !seen_first_packet_) { 344 // We need to at least send keep-alive packets with some interval. 345 return last_send_time_ + kCongestedPacketInterval; 346 } 347 348 if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { 349 // If packets are allowed to be sent in a burst, the 350 // debt is allowed to grow up to one packet more than what can be sent 351 // during 'send_burst_period_'. 352 TimeDelta drain_time = media_debt_ / adjusted_media_rate_; 353 // Ensure that a burst of sent packet is not larger than kMaxBurstSize in 354 // order to not risk overfilling socket buffers at high bitrate. 355 TimeDelta send_burst_interval = 356 std::min(send_burst_interval_, kMaxBurstSize / adjusted_media_rate_); 357 next_send_time = 358 last_process_time_ + 359 ((send_burst_interval > drain_time) ? TimeDelta::Zero() : drain_time); 360 } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { 361 // If we _don't_ have pending packets, check how long until we have 362 // bandwidth for padding packets. Both media and padding debts must 363 // have been drained to do this. 364 RTC_DCHECK_GT(adjusted_media_rate_, DataRate::Zero()); 365 TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_, 366 padding_debt_ / padding_rate_); 367 368 if (drain_time.IsZero() && 369 (!media_debt_.IsZero() || !padding_debt_.IsZero())) { 370 // We have a non-zero debt, but drain time is smaller than tick size of 371 // TimeDelta, round it up to the smallest possible non-zero delta. 372 drain_time = TimeDelta::Micros(1); 373 } 374 next_send_time = last_process_time_ + drain_time; 375 } else { 376 // Nothing to do. 377 next_send_time = last_process_time_ + kPausedProcessInterval; 378 } 379 380 if (send_padding_if_silent_) { 381 next_send_time = 382 std::min(next_send_time, last_send_time_ + kPausedProcessInterval); 383 } 384 385 return next_send_time; 386 } 387 388 void PacingController::ProcessPackets() { 389 absl::Cleanup cleanup = [packet_sender = packet_sender_] { 390 packet_sender->OnBatchComplete(); 391 }; 392 const Timestamp now = CurrentTime(); 393 Timestamp target_send_time = now; 394 395 if (ShouldSendKeepalive(now)) { 396 DataSize keepalive_data_sent = DataSize::Zero(); 397 // We can not send padding unless a normal packet has first been sent. If 398 // we do, timestamps get messed up. 399 if (seen_first_packet_) { 400 std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets = 401 packet_sender_->GeneratePadding(DataSize::Bytes(1)); 402 for (auto& packet : keepalive_packets) { 403 keepalive_data_sent += 404 DataSize::Bytes(packet->payload_size() + packet->padding_size()); 405 packet_sender_->SendPacket(std::move(packet), PacedPacketInfo()); 406 for (auto& fec_packet : packet_sender_->FetchFec()) { 407 EnqueuePacket(std::move(fec_packet)); 408 } 409 } 410 } 411 OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now); 412 } 413 414 if (paused_) { 415 return; 416 } 417 418 TimeDelta early_execute_margin = 419 prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); 420 421 target_send_time = NextSendTime(); 422 if (now + early_execute_margin < target_send_time) { 423 // We are too early, but if queue is empty still allow draining some debt. 424 // Probing is allowed to be sent up to kMinSleepTime early. 425 UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); 426 return; 427 } 428 429 TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time); 430 431 if (elapsed_time > TimeDelta::Zero()) { 432 UpdateBudgetWithElapsedTime(elapsed_time); 433 } 434 435 PacedPacketInfo pacing_info; 436 DataSize recommended_probe_size = DataSize::Zero(); 437 bool is_probing = prober_.is_probing(); 438 if (is_probing) { 439 // Probe timing is sensitive, and handled explicitly by BitrateProber, so 440 // use actual send time rather than target. 441 pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo()); 442 if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) { 443 recommended_probe_size = prober_.RecommendedMinProbeSize(); 444 RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero()); 445 } else { 446 // No valid probe cluster returned, probe might have timed out. 447 is_probing = false; 448 } 449 } 450 451 DataSize data_sent = DataSize::Zero(); 452 int iteration = 0; 453 int packets_sent = 0; 454 int padding_packets_generated = 0; 455 for (; iteration < circuit_breaker_threshold_; ++iteration) { 456 // Fetch packet, so long as queue is not empty or budget is not 457 // exhausted. 458 std::unique_ptr<RtpPacketToSend> rtp_packet = 459 GetPendingPacket(pacing_info, target_send_time, now); 460 if (rtp_packet == nullptr) { 461 // No packet available to send, check if we should send padding. 462 if (now - target_send_time > kMaxPaddingReplayDuration) { 463 // The target send time is more than `kMaxPaddingReplayDuration` behind 464 // the real-time clock. This can happen if the clock is adjusted forward 465 // without `ProcessPackets()` having been called at the expected times. 466 target_send_time = now - kMaxPaddingReplayDuration; 467 last_process_time_ = std::max(last_process_time_, target_send_time); 468 } 469 470 DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent); 471 if (padding_to_add > DataSize::Zero()) { 472 std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets = 473 packet_sender_->GeneratePadding(padding_to_add); 474 if (!padding_packets.empty()) { 475 padding_packets_generated += padding_packets.size(); 476 for (auto& packet : padding_packets) { 477 EnqueuePacket(std::move(packet)); 478 } 479 // Continue loop to send the padding that was just added. 480 continue; 481 } else { 482 // Can't generate padding, still update padding budget for next send 483 // time. 484 UpdatePaddingBudgetWithSentData(padding_to_add); 485 } 486 } 487 // Can't fetch new packet and no padding to send, exit send loop. 488 break; 489 } else { 490 RTC_DCHECK(rtp_packet); 491 RTC_DCHECK(rtp_packet->packet_type().has_value()); 492 const RtpPacketMediaType packet_type = *rtp_packet->packet_type(); 493 DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() + 494 rtp_packet->padding_size()); 495 496 if (include_overhead_) { 497 packet_size += DataSize::Bytes(rtp_packet->headers_size()) + 498 transport_overhead_per_packet_; 499 } 500 501 packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); 502 for (auto& packet : packet_sender_->FetchFec()) { 503 EnqueuePacket(std::move(packet)); 504 } 505 data_sent += packet_size; 506 ++packets_sent; 507 508 // Send done, update send time. 509 OnPacketSent(packet_type, packet_size, now); 510 511 if (is_probing) { 512 pacing_info.probe_cluster_bytes_sent += packet_size.bytes(); 513 // If we are currently probing, we need to stop the send loop when we 514 // have reached the send target. 515 if (data_sent >= recommended_probe_size) { 516 break; 517 } 518 } 519 520 // Update target send time in case that are more packets that we are late 521 // in processing. 522 target_send_time = NextSendTime(); 523 if (target_send_time > now) { 524 // Exit loop if not probing. 525 if (!is_probing) { 526 break; 527 } 528 target_send_time = now; 529 } 530 UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time)); 531 } 532 } 533 534 if (iteration >= circuit_breaker_threshold_) { 535 // Circuit break activated. Log warning, adjust send time and return. 536 // TODO(sprang): Consider completely clearing state. 537 RTC_LOG(LS_ERROR) 538 << "PacingController exceeded max iterations in " 539 "send-loop. Debug info: " 540 << " packets sent = " << packets_sent 541 << ", padding packets generated = " << padding_packets_generated 542 << ", bytes sent = " << data_sent.bytes() 543 << ", probing = " << (is_probing ? "true" : "false") 544 << ", recommended_probe_size = " << recommended_probe_size.bytes() 545 << ", now = " << now.us() 546 << ", target_send_time = " << target_send_time.us() 547 << ", last_process_time = " << last_process_time_.us() 548 << ", last_send_time = " << last_send_time_.us() 549 << ", paused = " << (paused_ ? "true" : "false") 550 << ", media_debt = " << media_debt_.bytes() 551 << ", padding_debt = " << padding_debt_.bytes() 552 << ", pacing_rate = " << pacing_rate_.bps() 553 << ", adjusted_media_rate = " << adjusted_media_rate_.bps() 554 << ", padding_rate = " << padding_rate_.bps() 555 << ", queue size (packets) = " << packet_queue_.SizeInPackets() 556 << ", queue size (payload bytes) = " 557 << packet_queue_.SizeInPayloadBytes(); 558 last_send_time_ = now; 559 last_process_time_ = now; 560 return; 561 } 562 563 if (is_probing) { 564 probing_send_failure_ = data_sent == DataSize::Zero(); 565 if (!probing_send_failure_) { 566 prober_.ProbeSent(CurrentTime(), data_sent); 567 } 568 } 569 570 // Queue length has probably decreased, check if pacing rate needs to updated. 571 // Poll the time again, since we might have enqueued new fec/padding packets 572 // with a later timestamp than `now`. 573 MaybeUpdateMediaRateDueToLongQueue(CurrentTime()); 574 } 575 576 DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, 577 DataSize data_sent) const { 578 if (!packet_queue_.Empty()) { 579 // Actual payload available, no need to add padding. 580 return DataSize::Zero(); 581 } 582 583 if (congested_) { 584 // Don't add padding if congested, even if requested for probing. 585 return DataSize::Zero(); 586 } 587 588 if (!recommended_probe_size.IsZero()) { 589 if (recommended_probe_size > data_sent) { 590 return recommended_probe_size - data_sent; 591 } 592 return DataSize::Zero(); 593 } 594 595 if (padding_rate_ > DataRate::Zero() && padding_debt_ == DataSize::Zero()) { 596 return kTargetPaddingDuration * padding_rate_; 597 } 598 return DataSize::Zero(); 599 } 600 601 std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket( 602 const PacedPacketInfo& pacing_info, 603 Timestamp target_send_time, 604 Timestamp now) { 605 const bool is_probe = 606 pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe; 607 // If first packet in probe, insert a small padding packet so we have a 608 // more reliable start window for the rate estimation. 609 if (is_probe && pacing_info.probe_cluster_bytes_sent == 0) { 610 auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1)); 611 // If no RTP modules sending media are registered, we may not get a 612 // padding packet back. 613 if (!padding.empty()) { 614 // We should never get more than one padding packets with a requested 615 // size of 1 byte. 616 RTC_DCHECK_EQ(padding.size(), 1u); 617 return std::move(padding[0]); 618 } 619 } 620 621 if (packet_queue_.Empty()) { 622 return nullptr; 623 } 624 625 // First, check if there is any reason _not_ to send the next queued packet. 626 // Unpaced packets and probes are exempted from send checks. 627 if (NextUnpacedSendTime().IsInfinite() && !is_probe) { 628 if (congested_) { 629 // Don't send anything if congested. 630 return nullptr; 631 } 632 633 if (now <= target_send_time && send_burst_interval_.IsZero()) { 634 // We allow sending slightly early if we think that we would actually 635 // had been able to, had we been right on time - i.e. the current debt 636 // is not more than would be reduced to zero at the target sent time. 637 // If we allow packets to be sent in a burst, packet are allowed to be 638 // sent early. 639 TimeDelta flush_time = media_debt_ / adjusted_media_rate_; 640 if (now + flush_time > target_send_time) { 641 return nullptr; 642 } 643 } 644 } 645 646 return packet_queue_.Pop(); 647 } 648 649 void PacingController::OnPacketSent(RtpPacketMediaType packet_type, 650 DataSize packet_size, 651 Timestamp send_time) { 652 if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) { 653 first_sent_packet_time_ = send_time; 654 } 655 656 bool audio_packet = packet_type == RtpPacketMediaType::kAudio; 657 if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) { 658 UpdateBudgetWithSentData(packet_size); 659 } 660 661 last_send_time_ = send_time; 662 } 663 664 void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { 665 media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta); 666 padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta); 667 } 668 669 void PacingController::UpdateBudgetWithSentData(DataSize size) { 670 media_debt_ += size; 671 media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime); 672 UpdatePaddingBudgetWithSentData(size); 673 } 674 675 void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) { 676 padding_debt_ += size; 677 padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime); 678 } 679 680 void PacingController::SetQueueTimeLimit(TimeDelta limit) { 681 queue_time_limit_ = limit; 682 } 683 684 void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) { 685 adjusted_media_rate_ = pacing_rate_; 686 if (!drain_large_queues_) { 687 return; 688 } 689 690 DataSize queue_size_data = QueueSizeData(); 691 if (queue_size_data > DataSize::Zero()) { 692 // Assuming equal size packets and input/output rate, the average packet 693 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if 694 // time constraint shall be met. Determine bitrate needed for that. 695 packet_queue_.UpdateAverageQueueTime(now); 696 TimeDelta avg_time_left = 697 std::max(TimeDelta::Millis(1), 698 queue_time_limit_ - packet_queue_.AverageQueueTime()); 699 DataRate min_rate_needed = queue_size_data / avg_time_left; 700 if (min_rate_needed > pacing_rate_) { 701 adjusted_media_rate_ = min_rate_needed; 702 RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps=" 703 << pacing_rate_.kbps(); 704 } 705 } 706 } 707 708 Timestamp PacingController::NextUnpacedSendTime() const { 709 if (!pace_audio_) { 710 Timestamp leading_audio_send_time = 711 packet_queue_.LeadingPacketEnqueueTime(RtpPacketMediaType::kAudio); 712 if (leading_audio_send_time.IsFinite()) { 713 return leading_audio_send_time; 714 } 715 } 716 if (fast_retransmissions_) { 717 Timestamp leading_retransmission_send_time = 718 packet_queue_.LeadingPacketEnqueueTimeForRetransmission(); 719 if (leading_retransmission_send_time.IsFinite()) { 720 return leading_retransmission_send_time; 721 } 722 } 723 return Timestamp::MinusInfinity(); 724 } 725 726 } // namespace webrtc