task_queue_paced_sender.cc (10669B)
1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/pacing/task_queue_paced_sender.h" 12 13 #include <algorithm> 14 #include <cstddef> 15 #include <cstdint> 16 #include <memory> 17 #include <optional> 18 #include <utility> 19 #include <vector> 20 21 #include "absl/cleanup/cleanup.h" 22 #include "api/field_trials_view.h" 23 #include "api/sequence_checker.h" 24 #include "api/task_queue/pending_task_safety_flag.h" 25 #include "api/task_queue/task_queue_base.h" 26 #include "api/transport/network_types.h" 27 #include "api/units/data_rate.h" 28 #include "api/units/data_size.h" 29 #include "api/units/time_delta.h" 30 #include "api/units/timestamp.h" 31 #include "modules/pacing/pacing_controller.h" 32 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" 33 #include "rtc_base/checks.h" 34 #include "rtc_base/numerics/exp_filter.h" 35 #include "rtc_base/trace_event.h" 36 37 namespace webrtc { 38 39 const int TaskQueuePacedSender::kNoPacketHoldback = -1; 40 41 TaskQueuePacedSender::TaskQueuePacedSender( 42 Clock* clock, 43 PacingController::PacketSender* packet_sender, 44 const FieldTrialsView& field_trials, 45 TimeDelta max_hold_back_window, 46 int max_hold_back_window_in_packets) 47 : clock_(clock), 48 max_hold_back_window_(max_hold_back_window), 49 max_hold_back_window_in_packets_(max_hold_back_window_in_packets), 50 pacing_controller_(clock, packet_sender, field_trials), 51 next_process_time_(Timestamp::MinusInfinity()), 52 is_started_(false), 53 is_shutdown_(false), 54 packet_size_(/*alpha=*/0.95), 55 include_overhead_(false), 56 task_queue_(TaskQueueBase::Current()) { 57 RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime); 58 } 59 60 TaskQueuePacedSender::~TaskQueuePacedSender() { 61 RTC_DCHECK_RUN_ON(task_queue_); 62 is_shutdown_ = true; 63 } 64 65 void TaskQueuePacedSender::SetSendBurstInterval(TimeDelta burst_interval) { 66 RTC_DCHECK_RUN_ON(task_queue_); 67 pacing_controller_.SetSendBurstInterval(burst_interval); 68 } 69 70 void TaskQueuePacedSender::SetAllowProbeWithoutMediaPacket(bool allow) { 71 RTC_DCHECK_RUN_ON(task_queue_); 72 pacing_controller_.SetAllowProbeWithoutMediaPacket(allow); 73 } 74 75 void TaskQueuePacedSender::EnsureStarted() { 76 RTC_DCHECK_RUN_ON(task_queue_); 77 is_started_ = true; 78 PostMaybeProcessPackets(); 79 } 80 81 void TaskQueuePacedSender::CreateProbeClusters( 82 std::vector<ProbeClusterConfig> probe_cluster_configs) { 83 RTC_DCHECK_RUN_ON(task_queue_); 84 pacing_controller_.CreateProbeClusters(probe_cluster_configs); 85 86 // Probing should be scheduled regardless of if the queue is empty or not in 87 // order to be able to BWE probe before media is sent. 88 task_queue_->PostTask(SafeTask(safety_.flag(), [this]() { 89 RTC_DCHECK_RUN_ON(task_queue_); 90 MaybeProcessPackets(Timestamp::MinusInfinity()); 91 })); 92 } 93 94 void TaskQueuePacedSender::Pause() { 95 RTC_DCHECK_RUN_ON(task_queue_); 96 pacing_controller_.Pause(); 97 } 98 99 void TaskQueuePacedSender::Resume() { 100 RTC_DCHECK_RUN_ON(task_queue_); 101 pacing_controller_.Resume(); 102 PostMaybeProcessPackets(); 103 } 104 105 void TaskQueuePacedSender::SetCongested(bool congested) { 106 RTC_DCHECK_RUN_ON(task_queue_); 107 pacing_controller_.SetCongested(congested); 108 PostMaybeProcessPackets(); 109 } 110 111 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate, 112 DataRate padding_rate) { 113 RTC_DCHECK_RUN_ON(task_queue_); 114 pacing_controller_.SetPacingRates(pacing_rate, padding_rate); 115 PostMaybeProcessPackets(); 116 } 117 118 void TaskQueuePacedSender::EnqueuePackets( 119 std::vector<std::unique_ptr<RtpPacketToSend>> packets) { 120 task_queue_->PostTask( 121 SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable { 122 RTC_DCHECK_RUN_ON(task_queue_); 123 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"), 124 "TaskQueuePacedSender::EnqueuePackets"); 125 for (auto& packet : packets) { 126 TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"), 127 "TaskQueuePacedSender::EnqueuePackets::Loop", 128 "sequence_number", packet->SequenceNumber(), 129 "rtp_timestamp", packet->Timestamp()); 130 131 size_t packet_size = packet->payload_size() + packet->padding_size(); 132 if (include_overhead_) { 133 packet_size += packet->headers_size(); 134 } 135 packet_size_.Apply(1, packet_size); 136 RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero()); 137 pacing_controller_.EnqueuePacket(std::move(packet)); 138 } 139 MaybeProcessPackets(Timestamp::MinusInfinity()); 140 })); 141 } 142 143 void TaskQueuePacedSender::RemovePacketsForSsrc(uint32_t ssrc) { 144 task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] { 145 RTC_DCHECK_RUN_ON(task_queue_); 146 pacing_controller_.RemovePacketsForSsrc(ssrc); 147 MaybeProcessPackets(Timestamp::MinusInfinity()); 148 })); 149 } 150 151 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) { 152 RTC_DCHECK_RUN_ON(task_queue_); 153 pacing_controller_.SetAccountForAudioPackets(account_for_audio); 154 PostMaybeProcessPackets(); 155 } 156 157 void TaskQueuePacedSender::SetIncludeOverhead() { 158 RTC_DCHECK_RUN_ON(task_queue_); 159 include_overhead_ = true; 160 pacing_controller_.SetIncludeOverhead(); 161 PostMaybeProcessPackets(); 162 } 163 164 void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) { 165 RTC_DCHECK_RUN_ON(task_queue_); 166 pacing_controller_.SetTransportOverhead(overhead_per_packet); 167 PostMaybeProcessPackets(); 168 } 169 170 void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) { 171 RTC_DCHECK_RUN_ON(task_queue_); 172 pacing_controller_.SetQueueTimeLimit(limit); 173 PostMaybeProcessPackets(); 174 } 175 176 TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const { 177 return GetStats().expected_queue_time; 178 } 179 180 DataSize TaskQueuePacedSender::QueueSizeData() const { 181 return GetStats().queue_size; 182 } 183 184 std::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const { 185 return GetStats().first_sent_packet_time; 186 } 187 188 TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const { 189 Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; 190 if (oldest_packet.IsInfinite()) { 191 return TimeDelta::Zero(); 192 } 193 194 // (webrtc:9716): The clock is not always monotonic. 195 Timestamp current = clock_->CurrentTime(); 196 if (current < oldest_packet) { 197 return TimeDelta::Zero(); 198 } 199 200 return current - oldest_packet; 201 } 202 203 void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) { 204 RTC_DCHECK_RUN_ON(task_queue_); 205 current_stats_ = stats; 206 } 207 208 void TaskQueuePacedSender::PostMaybeProcessPackets() { 209 if (pacing_controller_.QueueSizePackets() == 0) { 210 return; 211 } 212 task_queue_->PostTask(SafeTask(safety_.flag(), [this]() { 213 RTC_DCHECK_RUN_ON(task_queue_); 214 MaybeProcessPackets(Timestamp::MinusInfinity()); 215 })); 216 } 217 218 void TaskQueuePacedSender::MaybeProcessPackets( 219 Timestamp scheduled_process_time) { 220 RTC_DCHECK_RUN_ON(task_queue_); 221 222 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"), 223 "TaskQueuePacedSender::MaybeProcessPackets"); 224 225 if (is_shutdown_ || !is_started_) { 226 return; 227 } 228 229 // Protects against re-entry from transport feedback calling into the task 230 // queue pacer. 231 RTC_DCHECK(!processing_packets_); 232 processing_packets_ = true; 233 absl::Cleanup cleanup = [this] { 234 RTC_DCHECK_RUN_ON(task_queue_); 235 processing_packets_ = false; 236 }; 237 238 Timestamp next_send_time = pacing_controller_.NextSendTime(); 239 RTC_DCHECK(next_send_time.IsFinite()); 240 const Timestamp now = clock_->CurrentTime(); 241 TimeDelta early_execute_margin = 242 pacing_controller_.IsProbing() 243 ? PacingController::kMaxEarlyProbeProcessing 244 : TimeDelta::Zero(); 245 246 // Process packets and update stats. 247 while (next_send_time <= now + early_execute_margin) { 248 pacing_controller_.ProcessPackets(); 249 next_send_time = pacing_controller_.NextSendTime(); 250 RTC_DCHECK(next_send_time.IsFinite()); 251 252 // Probing state could change. Get margin after process packets. 253 early_execute_margin = pacing_controller_.IsProbing() 254 ? PacingController::kMaxEarlyProbeProcessing 255 : TimeDelta::Zero(); 256 } 257 UpdateStats(); 258 259 // Ignore retired scheduled task, otherwise reset `next_process_time_`. 260 if (scheduled_process_time.IsFinite()) { 261 if (scheduled_process_time != next_process_time_) { 262 return; 263 } 264 next_process_time_ = Timestamp::MinusInfinity(); 265 } 266 267 // Do not hold back in probing. 268 TimeDelta hold_back_window = TimeDelta::Zero(); 269 if (!pacing_controller_.IsProbing()) { 270 hold_back_window = max_hold_back_window_; 271 DataRate pacing_rate = pacing_controller_.pacing_rate(); 272 if (max_hold_back_window_in_packets_ != kNoPacketHoldback && 273 !pacing_rate.IsZero() && 274 packet_size_.filtered() != ExpFilter::kValueUndefined) { 275 TimeDelta avg_packet_send_time = 276 DataSize::Bytes(packet_size_.filtered()) / pacing_rate; 277 hold_back_window = 278 std::min(hold_back_window, 279 avg_packet_send_time * max_hold_back_window_in_packets_); 280 } 281 } 282 283 // Calculate next process time. 284 TimeDelta time_to_next_process = 285 std::max(hold_back_window, next_send_time - now - early_execute_margin); 286 next_send_time = now + time_to_next_process; 287 288 // If no in flight task or in flight task is later than `next_send_time`, 289 // schedule a new one. Previous in flight task will be retired. 290 if (next_process_time_.IsMinusInfinity() || 291 next_process_time_ > next_send_time) { 292 // Prefer low precision if allowed and not probing. 293 task_queue_->PostDelayedHighPrecisionTask( 294 SafeTask( 295 safety_.flag(), 296 [this, next_send_time]() { MaybeProcessPackets(next_send_time); }), 297 time_to_next_process.RoundUpTo(TimeDelta::Millis(1))); 298 next_process_time_ = next_send_time; 299 } 300 } 301 302 void TaskQueuePacedSender::UpdateStats() { 303 Stats new_stats; 304 new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); 305 new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime(); 306 new_stats.oldest_packet_enqueue_time = 307 pacing_controller_.OldestPacketEnqueueTime(); 308 new_stats.queue_size = pacing_controller_.QueueSizeData(); 309 OnStatsUpdated(new_stats); 310 } 311 312 TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const { 313 RTC_DCHECK_RUN_ON(task_queue_); 314 return current_stats_; 315 } 316 317 } // namespace webrtc