task_queue_paced_sender_unittest.cc (33934B)
1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/pacing/task_queue_paced_sender.h" 12 13 #include <algorithm> 14 #include <cstddef> 15 #include <cstdint> 16 #include <memory> 17 #include <utility> 18 #include <vector> 19 20 #include "absl/cleanup/cleanup.h" 21 #include "api/field_trials.h" 22 #include "api/sequence_checker.h" 23 #include "api/task_queue/task_queue_base.h" 24 #include "api/task_queue/task_queue_factory.h" 25 #include "api/transport/network_types.h" 26 #include "api/units/data_rate.h" 27 #include "api/units/data_size.h" 28 #include "api/units/time_delta.h" 29 #include "api/units/timestamp.h" 30 #include "modules/pacing/pacing_controller.h" 31 #include "modules/pacing/packet_router.h" 32 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 33 #include "test/create_test_field_trials.h" 34 #include "test/gmock.h" 35 #include "test/gtest.h" 36 #include "test/time_controller/simulated_time_controller.h" 37 38 using ::testing::_; 39 using ::testing::AtLeast; 40 using ::testing::AtMost; 41 using ::testing::NiceMock; 42 43 namespace webrtc { 44 namespace { 45 constexpr uint32_t kAudioSsrc = 12345; 46 constexpr uint32_t kVideoSsrc = 234565; 47 constexpr uint32_t kVideoRtxSsrc = 34567; 48 constexpr uint32_t kFlexFecSsrc = 45678; 49 constexpr size_t kDefaultPacketSize = 1234; 50 51 class MockPacketRouter : public PacketRouter { 52 public: 53 MOCK_METHOD(void, 54 SendPacket, 55 (std::unique_ptr<RtpPacketToSend> packet, 56 const PacedPacketInfo& cluster_info), 57 (override)); 58 MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>, 59 FetchFec, 60 (), 61 (override)); 62 MOCK_METHOD(std::vector<std::unique_ptr<RtpPacketToSend>>, 63 GeneratePadding, 64 (DataSize target_size), 65 (override)); 66 }; 67 68 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding( 69 DataSize target_size) { 70 // 224 bytes is the max padding size for plain padding packets generated by 71 // RTPSender::GeneratePadding(). 72 const DataSize kMaxPaddingPacketSize = DataSize::Bytes(224); 73 DataSize padding_generated = DataSize::Zero(); 74 std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets; 75 while (padding_generated < target_size) { 76 DataSize packet_size = 77 std::min(target_size - padding_generated, kMaxPaddingPacketSize); 78 padding_generated += packet_size; 79 auto padding_packet = 80 std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr); 81 padding_packet->set_packet_type(RtpPacketMediaType::kPadding); 82 padding_packet->SetPadding(packet_size.bytes()); 83 padding_packets.push_back(std::move(padding_packet)); 84 } 85 return padding_packets; 86 } 87 88 } // namespace 89 90 namespace test { 91 92 std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) { 93 auto packet = std::make_unique<RtpPacketToSend>(nullptr); 94 packet->set_packet_type(type); 95 switch (type) { 96 case RtpPacketMediaType::kAudio: 97 packet->SetSsrc(kAudioSsrc); 98 break; 99 case RtpPacketMediaType::kVideo: 100 packet->SetSsrc(kVideoSsrc); 101 break; 102 case RtpPacketMediaType::kRetransmission: 103 case RtpPacketMediaType::kPadding: 104 packet->SetSsrc(kVideoRtxSsrc); 105 break; 106 case RtpPacketMediaType::kForwardErrorCorrection: 107 packet->SetSsrc(kFlexFecSsrc); 108 break; 109 } 110 111 packet->SetPayloadSize(kDefaultPacketSize); 112 return packet; 113 } 114 115 std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets( 116 RtpPacketMediaType type, 117 size_t num_packets) { 118 std::vector<std::unique_ptr<RtpPacketToSend>> packets; 119 for (size_t i = 0; i < num_packets; ++i) { 120 packets.push_back(BuildRtpPacket(type)); 121 } 122 return packets; 123 } 124 125 TEST(TaskQueuePacedSenderTest, PacesPackets) { 126 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 127 MockPacketRouter packet_router; 128 FieldTrials trials = CreateTestFieldTrials(); 129 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 130 PacingController::kMinSleepTime, 131 TaskQueuePacedSender::kNoPacketHoldback); 132 133 // Insert a number of packets, covering one second. 134 static constexpr size_t kPacketsToSend = 42; 135 SequenceChecker sequence_checker; 136 pacer.SetPacingRates( 137 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), 138 DataRate::Zero()); 139 pacer.EnsureStarted(); 140 pacer.EnqueuePackets( 141 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); 142 143 // Expect all of them to be sent. 144 size_t packets_sent = 0; 145 Timestamp end_time = Timestamp::PlusInfinity(); 146 EXPECT_CALL(packet_router, SendPacket) 147 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> /* packet */, 148 const PacedPacketInfo& /* cluster_info */) { 149 ++packets_sent; 150 if (packets_sent == kPacketsToSend) { 151 end_time = time_controller.GetClock()->CurrentTime(); 152 } 153 EXPECT_TRUE(sequence_checker.IsCurrent()); 154 }); 155 156 const Timestamp start_time = time_controller.GetClock()->CurrentTime(); 157 158 // Packets should be sent over a period of close to 1s. Expect a little 159 // lower than this since initial probing is a bit quicker. 160 time_controller.AdvanceTime(TimeDelta::Seconds(1)); 161 EXPECT_EQ(packets_sent, kPacketsToSend); 162 ASSERT_TRUE(end_time.IsFinite()); 163 EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0); 164 } 165 166 // Same test as above, but with 0.5s of burst applied. 167 TEST(TaskQueuePacedSenderTest, PacesPacketsWithBurst) { 168 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 169 MockPacketRouter packet_router; 170 FieldTrials trials = CreateTestFieldTrials(); 171 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 172 PacingController::kMinSleepTime, 173 TaskQueuePacedSender::kNoPacketHoldback); 174 pacer.SetSendBurstInterval( 175 // Half a second of bursting. 176 TimeDelta::Seconds(0.5)); 177 178 // Insert a number of packets, covering one second. 179 static constexpr size_t kPacketsToSend = 42; 180 SequenceChecker sequence_checker; 181 pacer.SetPacingRates( 182 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend), 183 DataRate::Zero()); 184 pacer.EnsureStarted(); 185 pacer.EnqueuePackets( 186 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); 187 188 // Expect all of them to be sent. 189 size_t packets_sent = 0; 190 Timestamp end_time = Timestamp::PlusInfinity(); 191 EXPECT_CALL(packet_router, SendPacket) 192 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> /* packet */, 193 const PacedPacketInfo& /* cluster_info */) { 194 ++packets_sent; 195 if (packets_sent == kPacketsToSend) { 196 end_time = time_controller.GetClock()->CurrentTime(); 197 } 198 EXPECT_TRUE(sequence_checker.IsCurrent()); 199 }); 200 201 const Timestamp start_time = time_controller.GetClock()->CurrentTime(); 202 203 // Packets should be sent over a period of close to 1s. Expect a little 204 // lower than this since initial probing is a bit quicker. 205 time_controller.AdvanceTime(TimeDelta::Seconds(1)); 206 EXPECT_EQ(packets_sent, kPacketsToSend); 207 ASSERT_TRUE(end_time.IsFinite()); 208 // Because of half a second of burst, what would normally have been paced over 209 // ~1 second now takes ~0.5 seconds. 210 EXPECT_NEAR((end_time - start_time).ms<double>(), 500.0, 50.0); 211 } 212 213 TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) { 214 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 215 MockPacketRouter packet_router; 216 FieldTrials trials = CreateTestFieldTrials(); 217 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 218 PacingController::kMinSleepTime, 219 TaskQueuePacedSender::kNoPacketHoldback); 220 221 // Insert a number of packets to be sent 200ms apart. 222 const size_t kPacketsPerSecond = 5; 223 const DataRate kPacingRate = 224 DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond); 225 pacer.SetPacingRates(kPacingRate, DataRate::Zero()); 226 pacer.EnsureStarted(); 227 228 // Send some initial packets to be rid of any probes. 229 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond); 230 pacer.EnqueuePackets( 231 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond)); 232 time_controller.AdvanceTime(TimeDelta::Seconds(1)); 233 234 // Insert three packets, and record send time of each of them. 235 // After the second packet is sent, double the send rate so we can 236 // check the third packets is sent after half the wait time. 237 Timestamp first_packet_time = Timestamp::MinusInfinity(); 238 Timestamp second_packet_time = Timestamp::MinusInfinity(); 239 Timestamp third_packet_time = Timestamp::MinusInfinity(); 240 241 EXPECT_CALL(packet_router, SendPacket) 242 .Times(3) 243 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> /* packet */, 244 const PacedPacketInfo& /* cluster_info */) { 245 if (first_packet_time.IsInfinite()) { 246 first_packet_time = time_controller.GetClock()->CurrentTime(); 247 } else if (second_packet_time.IsInfinite()) { 248 second_packet_time = time_controller.GetClock()->CurrentTime(); 249 // Avoid invoke SetPacingRate in the context of sending a packet. 250 time_controller.GetMainThread()->PostTask( 251 [&] { pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); }); 252 } else { 253 third_packet_time = time_controller.GetClock()->CurrentTime(); 254 } 255 }); 256 257 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3)); 258 time_controller.AdvanceTime(TimeDelta::Millis(500)); 259 ASSERT_TRUE(third_packet_time.IsFinite()); 260 EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0, 261 1.0); 262 EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0, 263 1.0); 264 } 265 266 TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) { 267 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 268 NiceMock<MockPacketRouter> packet_router; 269 FieldTrials trials = CreateTestFieldTrials(); 270 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 271 PacingController::kMinSleepTime, 272 TaskQueuePacedSender::kNoPacketHoldback); 273 274 const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125); 275 276 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 277 pacer.EnsureStarted(); 278 279 // Add some initial video packets. Not all should be sent immediately. 280 EXPECT_CALL(packet_router, SendPacket).Times(AtMost(9)); 281 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); 282 time_controller.AdvanceTime(TimeDelta::Zero()); 283 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 284 285 // Insert an audio packet, it should be sent immediately. 286 EXPECT_CALL(packet_router, SendPacket); 287 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1)); 288 time_controller.AdvanceTime(TimeDelta::Zero()); 289 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 290 } 291 292 TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) { 293 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); 294 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 295 NiceMock<MockPacketRouter> packet_router; 296 FieldTrials trials = CreateTestFieldTrials(); 297 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 298 kCoalescingWindow, 299 TaskQueuePacedSender::kNoPacketHoldback); 300 pacer.SetSendBurstInterval(TimeDelta::Zero()); 301 302 // Set rates so one packet adds one ms of buffer level. 303 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 304 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 305 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 306 307 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 308 pacer.EnsureStarted(); 309 310 // Add 10 packets. The first burst should be sent immediately since the 311 // buffers are clear. 312 EXPECT_CALL(packet_router, SendPacket).Times(AtMost(9)); 313 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); 314 time_controller.AdvanceTime(TimeDelta::Zero()); 315 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 316 317 // Advance time to 1ms before the coalescing window ends. No packets should 318 // be sent. 319 EXPECT_CALL(packet_router, SendPacket).Times(0); 320 time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); 321 322 // Advance time to where coalescing window ends. All packets that should 323 // have been sent up til now will be sent. 324 EXPECT_CALL(packet_router, SendPacket).Times(5); 325 time_controller.AdvanceTime(TimeDelta::Millis(1)); 326 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 327 } 328 329 TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) { 330 const TimeDelta kCoalescingWindow = TimeDelta::Millis(5); 331 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 332 MockPacketRouter packet_router; 333 FieldTrials trials = CreateTestFieldTrials(); 334 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 335 kCoalescingWindow, 336 TaskQueuePacedSender::kNoPacketHoldback); 337 338 // Set rates so one packet adds one ms of buffer level. 339 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 340 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 341 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 342 343 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 344 pacer.EnsureStarted(); 345 346 // Add 10 packets. The first should be sent immediately since the buffers 347 // are clear. This will also trigger the probe to start. 348 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); 349 pacer.CreateProbeClusters( 350 {{.at_time = time_controller.GetClock()->CurrentTime(), 351 .target_data_rate = kPacingDataRate * 2, 352 .target_duration = TimeDelta::Millis(15), 353 .target_probe_count = 5, 354 .id = 17}}); 355 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10)); 356 time_controller.AdvanceTime(TimeDelta::Zero()); 357 ::testing::Mock::VerifyAndClearExpectations(&packet_router); 358 359 // Advance time to 1ms before the coalescing window ends. Packets should be 360 // flying. 361 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); 362 time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1)); 363 } 364 365 TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) { 366 FieldTrials trials = 367 CreateTestFieldTrials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); 368 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 369 NiceMock<MockPacketRouter> packet_router; 370 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 371 PacingController::kMinSleepTime, 372 TaskQueuePacedSender::kNoPacketHoldback); 373 pacer.SetSendBurstInterval(TimeDelta::Zero()); 374 375 // Set rates so one packet adds 4ms of buffer level. 376 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 377 const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); 378 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 379 pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); 380 pacer.EnsureStarted(); 381 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { 382 return std::vector<std::unique_ptr<RtpPacketToSend>>(); 383 }); 384 EXPECT_CALL(packet_router, GeneratePadding(_)) 385 .WillRepeatedly( 386 [](DataSize target_size) { return GeneratePadding(target_size); }); 387 388 // Enqueue two packets, only the first is sent immediately and the next 389 // will be scheduled for sending in 4ms. 390 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2)); 391 const int kNotAProbe = PacedPacketInfo::kNotAProbe; 392 EXPECT_CALL(packet_router, 393 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, 394 kNotAProbe))); 395 // Advance to less than 3ms before next packet send time. 396 time_controller.AdvanceTime(TimeDelta::Micros(1001)); 397 398 // Trigger a probe at 2x the current pacing rate and insert the number of 399 // packets the probe needs. 400 const DataRate kProbeRate = 2 * kPacingDataRate; 401 const int kProbeClusterId = 1; 402 pacer.CreateProbeClusters( 403 {{.at_time = time_controller.GetClock()->CurrentTime(), 404 .target_data_rate = kProbeRate, 405 .target_duration = TimeDelta::Millis(15), 406 .target_probe_count = 4, 407 .id = kProbeClusterId}}); 408 409 // Expected size for each probe in a cluster is twice the expected bits sent 410 // during min_probe_delta. 411 // Expect one additional call since probe always starts with a small (1 byte) 412 // padding packet that's not counted into the probe rate here. 413 const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2); 414 const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; 415 const size_t kNumPacketsInProbe = 416 (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize; 417 EXPECT_CALL(packet_router, 418 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, 419 kProbeClusterId))) 420 .Times(kNumPacketsInProbe + 1); 421 422 pacer.EnqueuePackets( 423 GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe)); 424 time_controller.AdvanceTime(TimeDelta::Zero()); 425 426 // The pacer should have scheduled the next probe to be sent in 427 // kProbeTimeDelta. That there was existing scheduled call less than 428 // PacingController::kMinSleepTime before this should not matter. 429 EXPECT_CALL(packet_router, 430 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, 431 kProbeClusterId))) 432 .Times(AtLeast(1)); 433 time_controller.AdvanceTime(TimeDelta::Millis(2)); 434 } 435 436 TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { 437 // Set min_probe_delta to be less than kMinSleepTime (1ms). 438 const TimeDelta kMinProbeDelta = TimeDelta::Micros(200); 439 FieldTrials trials = CreateTestFieldTrials(); 440 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 441 MockPacketRouter packet_router; 442 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 443 PacingController::kMinSleepTime, 444 TaskQueuePacedSender::kNoPacketHoldback); 445 446 // Set rates so one packet adds 4ms of buffer level. 447 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 448 const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); 449 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 450 pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); 451 pacer.EnsureStarted(); 452 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { 453 return std::vector<std::unique_ptr<RtpPacketToSend>>(); 454 }); 455 EXPECT_CALL(packet_router, GeneratePadding) 456 .WillRepeatedly( 457 [](DataSize target_size) { return GeneratePadding(target_size); }); 458 459 // Set a high probe rate. 460 const int kProbeClusterId = 1; 461 DataRate kProbingRate = kPacingDataRate * 10; 462 463 pacer.CreateProbeClusters( 464 {{.at_time = time_controller.GetClock()->CurrentTime(), 465 .target_data_rate = kProbingRate, 466 .target_duration = TimeDelta::Millis(15), 467 .min_probe_delta = kMinProbeDelta, 468 .target_probe_count = 5, 469 .id = kProbeClusterId}}); 470 471 // Advance time less than PacingController::kMinSleepTime, probing packets 472 // for the first millisecond should be sent immediately. Min delta between 473 // probes is 200us, meaning 4 times per ms we will get least one call to 474 // SendPacket(). 475 DataSize data_sent = DataSize::Zero(); 476 EXPECT_CALL(packet_router, 477 SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, 478 kProbeClusterId))) 479 .Times(AtLeast(4)) 480 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet, 481 const PacedPacketInfo&) { 482 data_sent += 483 DataSize::Bytes(packet->payload_size() + packet->padding_size()); 484 }); 485 486 // Add one packet to kickstart probing, the rest will be padding packets. 487 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); 488 time_controller.AdvanceTime(kMinProbeDelta); 489 490 // Verify the amount of probing data sent. 491 // Probe always starts with a small (1 byte) padding packet that's not 492 // counted into the probe rate here. 493 const DataSize kMinProbeSize = kMinProbeDelta * kProbingRate; 494 EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize); 495 } 496 497 TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) { 498 const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10); 499 const int kPacketBasedHoldback = 5; 500 501 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 502 NiceMock<MockPacketRouter> packet_router; 503 FieldTrials trials = CreateTestFieldTrials(); 504 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 505 kFixedCoalescingWindow, kPacketBasedHoldback); 506 pacer.SetSendBurstInterval(TimeDelta::Zero()); 507 508 // Set rates so one packet adds one ms of buffer level. 509 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 510 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 511 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 512 const TimeDelta kExpectedHoldbackWindow = 513 kPacketPacingTime * kPacketBasedHoldback; 514 // `kFixedCoalescingWindow` sets the upper bound for the window. 515 ASSERT_GE(kFixedCoalescingWindow, kExpectedHoldbackWindow); 516 517 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 518 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { 519 return std::vector<std::unique_ptr<RtpPacketToSend>>(); 520 }); 521 pacer.EnsureStarted(); 522 523 // Add some packets and wait till all have been sent, so that the pacer 524 // has a valid estimate of packet size. 525 const int kNumWarmupPackets = 40; 526 EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets); 527 pacer.EnqueuePackets( 528 GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets)); 529 // Wait until all packes have been sent, with a 2x margin. 530 time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2)); 531 532 // Enqueue packets. Expect only the first one to be sent immediately. 533 EXPECT_CALL(packet_router, SendPacket).Times(1); 534 pacer.EnqueuePackets( 535 GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback)); 536 time_controller.AdvanceTime(TimeDelta::Zero()); 537 538 // Advance time to 1ms before the coalescing window ends. 539 EXPECT_CALL(packet_router, SendPacket).Times(0); 540 time_controller.AdvanceTime(kExpectedHoldbackWindow - TimeDelta::Millis(1)); 541 542 // Advance past where the coalescing window should end. 543 EXPECT_CALL(packet_router, SendPacket).Times(kPacketBasedHoldback - 1); 544 time_controller.AdvanceTime(TimeDelta::Millis(1)); 545 } 546 547 TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) { 548 const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2); 549 const int kPacketBasedHoldback = 5; 550 551 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 552 MockPacketRouter packet_router; 553 FieldTrials trials = CreateTestFieldTrials(); 554 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 555 kFixedCoalescingWindow, kPacketBasedHoldback); 556 pacer.SetSendBurstInterval(TimeDelta::Zero()); 557 558 // Set rates so one packet adds one ms of buffer level. 559 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 560 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 561 const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; 562 const TimeDelta kExpectedPacketHoldbackWindow = 563 kPacketPacingTime * kPacketBasedHoldback; 564 // |kFixedCoalescingWindow| sets the upper bound for the window. 565 ASSERT_LT(kFixedCoalescingWindow, kExpectedPacketHoldbackWindow); 566 567 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 568 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { 569 return std::vector<std::unique_ptr<RtpPacketToSend>>(); 570 }); 571 pacer.EnsureStarted(); 572 573 // Add some packets and wait till all have been sent, so that the pacer 574 // has a valid estimate of packet size. 575 const int kNumWarmupPackets = 40; 576 EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets); 577 pacer.EnqueuePackets( 578 GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets)); 579 // Wait until all packes have been sent, with a 2x margin. 580 time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2)); 581 582 // Enqueue packets. Expect onlt the first one to be sent immediately. 583 EXPECT_CALL(packet_router, SendPacket).Times(1); 584 pacer.EnqueuePackets( 585 GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback)); 586 time_controller.AdvanceTime(TimeDelta::Zero()); 587 588 // Advance time to the fixed coalescing window, that should take presedence so 589 // at least some of the packets should be sent. 590 EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1)); 591 time_controller.AdvanceTime(kFixedCoalescingWindow); 592 } 593 594 TEST(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) { 595 // Set a low `min_probe_delta` to let probing finish during send loop. 596 FieldTrials trials = CreateTestFieldTrials( 597 "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/"); 598 599 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 600 MockPacketRouter packet_router; 601 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 602 PacingController::kMinSleepTime, 603 TaskQueuePacedSender::kNoPacketHoldback); 604 605 // Set rates so 2 packets adds 1ms of buffer level. 606 const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); 607 const TimeDelta kPacketPacingTime = TimeDelta::Millis(1); 608 const DataRate kPacingDataRate = 2 * kPacketSize / kPacketPacingTime; 609 610 pacer.SetPacingRates(kPacingDataRate, DataRate::Zero()); 611 pacer.EnsureStarted(); 612 613 EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { 614 return std::vector<std::unique_ptr<RtpPacketToSend>>(); 615 }); 616 EXPECT_CALL(packet_router, GeneratePadding(_)) 617 .WillRepeatedly( 618 [](DataSize target_size) { return GeneratePadding(target_size); }); 619 620 // Set probe rate. 621 const int kProbeClusterId = 1; 622 const DataRate kProbingRate = kPacingDataRate; 623 624 pacer.CreateProbeClusters( 625 {{.at_time = time_controller.GetClock()->CurrentTime(), 626 .target_data_rate = kProbingRate, 627 .target_duration = TimeDelta::Millis(15), 628 .target_probe_count = 4, 629 .id = kProbeClusterId}}); 630 631 const int kPacketsToSend = 100; 632 const TimeDelta kPacketsPacedTime = 633 std::max(kPacketsToSend * kPacketSize / kPacingDataRate, 634 kPacketsToSend * kPacketSize / kProbingRate); 635 636 // Expect all packets and one padding packet sent. 637 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend + 1); 638 pacer.EnqueuePackets( 639 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); 640 time_controller.AdvanceTime(kPacketsPacedTime + TimeDelta::Millis(1)); 641 } 642 643 TEST(TaskQueuePacedSenderTest, PostedPacketsNotSendFromRemovePacketsForSsrc) { 644 static constexpr Timestamp kStartTime = Timestamp::Millis(1234); 645 GlobalSimulatedTimeController time_controller(kStartTime); 646 FieldTrials trials = CreateTestFieldTrials(); 647 MockPacketRouter packet_router; 648 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 649 PacingController::kMinSleepTime, 650 TaskQueuePacedSender::kNoPacketHoldback); 651 652 static constexpr DataRate kPacingRate = 653 DataRate::BytesPerSec(kDefaultPacketSize * 10); 654 pacer.SetPacingRates(kPacingRate, DataRate::Zero()); 655 pacer.EnsureStarted(); 656 657 auto encoder_queue = time_controller.GetTaskQueueFactory()->CreateTaskQueue( 658 "encoder_queue", TaskQueueFactory::Priority::HIGH); 659 660 EXPECT_CALL(packet_router, SendPacket).Times(5); 661 encoder_queue->PostTask([&pacer] { 662 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 6)); 663 }); 664 665 time_controller.AdvanceTime(TimeDelta::Millis(400)); 666 // 1 packet left. 667 EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(400)); 668 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); 669 670 // Enqueue packets while removing ssrcs should not send any more packets. 671 encoder_queue->PostTask( 672 [&pacer, worker_thread = time_controller.GetMainThread()] { 673 worker_thread->PostTask( 674 [&pacer] { pacer.RemovePacketsForSsrc(kVideoSsrc); }); 675 pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 5)); 676 }); 677 time_controller.AdvanceTime(TimeDelta::Seconds(1)); 678 EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Zero()); 679 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); 680 EXPECT_EQ(pacer.QueueSizeData(), DataSize::Zero()); 681 EXPECT_EQ(pacer.ExpectedQueueTime(), TimeDelta::Zero()); 682 } 683 684 TEST(TaskQueuePacedSenderTest, Stats) { 685 static constexpr Timestamp kStartTime = Timestamp::Millis(1234); 686 GlobalSimulatedTimeController time_controller(kStartTime); 687 NiceMock<MockPacketRouter> packet_router; 688 FieldTrials trials = CreateTestFieldTrials(); 689 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 690 PacingController::kMinSleepTime, 691 TaskQueuePacedSender::kNoPacketHoldback); 692 693 // Simulate ~2mbps video stream, covering one second. 694 static constexpr size_t kPacketsToSend = 200; 695 static constexpr DataRate kPacingRate = 696 DataRate::BytesPerSec(kDefaultPacketSize * kPacketsToSend); 697 pacer.SetPacingRates(kPacingRate, DataRate::Zero()); 698 pacer.EnsureStarted(); 699 700 // Allowed `QueueSizeData` and `ExpectedQueueTime` deviation. 701 static constexpr size_t kAllowedPacketsDeviation = 1; 702 static constexpr DataSize kAllowedQueueSizeDeviation = 703 DataSize::Bytes(kDefaultPacketSize * kAllowedPacketsDeviation) + 704 kPacingRate * PacingController::kDefaultBurstInterval; 705 static constexpr TimeDelta kAllowedQueueTimeDeviation = 706 kAllowedQueueSizeDeviation / kPacingRate; 707 708 DataSize expected_queue_size = DataSize::MinusInfinity(); 709 TimeDelta expected_queue_time = TimeDelta::MinusInfinity(); 710 711 EXPECT_CALL(packet_router, SendPacket).Times(kPacketsToSend); 712 713 // Stats before insert any packets. 714 EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero()); 715 EXPECT_FALSE(pacer.FirstSentPacketTime().has_value()); 716 EXPECT_TRUE(pacer.QueueSizeData().IsZero()); 717 EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero()); 718 719 pacer.EnqueuePackets( 720 GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend)); 721 722 // Advance to 200ms. 723 time_controller.AdvanceTime(TimeDelta::Millis(200)); 724 EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(200)); 725 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); 726 727 expected_queue_size = kPacingRate * TimeDelta::Millis(800); 728 expected_queue_time = expected_queue_size / kPacingRate; 729 EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(), 730 kAllowedQueueSizeDeviation.bytes()); 731 EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(), 732 kAllowedQueueTimeDeviation.ms()); 733 734 // Advance to 500ms. 735 time_controller.AdvanceTime(TimeDelta::Millis(300)); 736 EXPECT_EQ(pacer.OldestPacketWaitTime(), TimeDelta::Millis(500)); 737 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); 738 739 expected_queue_size = kPacingRate * TimeDelta::Millis(500); 740 expected_queue_time = expected_queue_size / kPacingRate; 741 EXPECT_NEAR(pacer.QueueSizeData().bytes(), expected_queue_size.bytes(), 742 kAllowedQueueSizeDeviation.bytes()); 743 EXPECT_NEAR(pacer.ExpectedQueueTime().ms(), expected_queue_time.ms(), 744 kAllowedQueueTimeDeviation.ms()); 745 746 // Advance to 1000ms+, expect all packets to be sent. 747 time_controller.AdvanceTime(TimeDelta::Millis(500) + 748 kAllowedQueueTimeDeviation); 749 EXPECT_TRUE(pacer.OldestPacketWaitTime().IsZero()); 750 EXPECT_EQ(pacer.FirstSentPacketTime(), kStartTime); 751 EXPECT_TRUE(pacer.QueueSizeData().IsZero()); 752 EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero()); 753 } 754 755 TEST(TaskQueuePacedSenderTest, 756 ChangePacingRateInSentPacketCallstackDoesNotSendNextPacketInSameStack) { 757 FieldTrials trials = CreateTestFieldTrials(); 758 GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); 759 MockPacketRouter packet_router; 760 TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials, 761 PacingController::kMinSleepTime, 762 TaskQueuePacedSender::kNoPacketHoldback); 763 pacer.EnsureStarted(); 764 pacer.SetPacingRates(DataRate::KilobitsPerSec(500), 765 /*padding_rate=*/DataRate::Zero()); 766 767 bool send_packet_stack = false; 768 EXPECT_CALL(packet_router, SendPacket) 769 .Times(2) 770 .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet, 771 const PacedPacketInfo& cluster_info) { 772 EXPECT_FALSE(send_packet_stack); 773 send_packet_stack = true; 774 absl::Cleanup cleanup = [&] { send_packet_stack = false; }; 775 pacer.SetPacingRates(DataRate::KilobitsPerSec(1000), 776 /*padding_rate=*/DataRate::Zero()); 777 }); 778 pacer.EnqueuePackets( 779 GeneratePackets(RtpPacketMediaType::kVideo, /*num_packets=*/2)); 780 time_controller.AdvanceTime(TimeDelta::Millis(10)); 781 } 782 783 } // namespace test 784 } // namespace webrtc