rtp_transport_controller_send.cc (31568B)
1 /* 2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 #include "call/rtp_transport_controller_send.h" 11 12 #include <cstddef> 13 #include <cstdint> 14 #include <map> 15 #include <memory> 16 #include <optional> 17 #include <string> 18 #include <utility> 19 #include <vector> 20 21 #include "absl/strings/string_view.h" 22 #include "api/array_view.h" 23 #include "api/call/transport.h" 24 #include "api/fec_controller.h" 25 #include "api/frame_transformer_interface.h" 26 #include "api/rtc_event_log/rtc_event_log.h" 27 #include "api/rtp_packet_sender.h" 28 #include "api/scoped_refptr.h" 29 #include "api/sequence_checker.h" 30 #include "api/task_queue/pending_task_safety_flag.h" 31 #include "api/task_queue/task_queue_base.h" 32 #include "api/transport/bandwidth_estimation_settings.h" 33 #include "api/transport/bitrate_settings.h" 34 #include "api/transport/goog_cc_factory.h" 35 #include "api/transport/network_control.h" 36 #include "api/transport/network_types.h" 37 #include "api/units/data_rate.h" 38 #include "api/units/data_size.h" 39 #include "api/units/time_delta.h" 40 #include "api/units/timestamp.h" 41 #include "call/rtp_config.h" 42 #include "call/rtp_transport_config.h" 43 #include "call/rtp_transport_controller_send_interface.h" 44 #include "call/rtp_video_sender.h" 45 #include "call/rtp_video_sender_interface.h" 46 #include "logging/rtc_event_log/events/rtc_event_route_change.h" 47 #include "modules/congestion_controller/rtp/control_handler.h" 48 #include "modules/pacing/packet_router.h" 49 #include "modules/rtp_rtcp/include/report_block_data.h" 50 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 51 #include "modules/rtp_rtcp/source/rtcp_packet/congestion_control_feedback.h" 52 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" 53 #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h" 54 #include "rtc_base/checks.h" 55 #include "rtc_base/experiments/field_trial_parser.h" 56 #include "rtc_base/logging.h" 57 #include "rtc_base/network/sent_packet.h" 58 #include "rtc_base/network_route.h" 59 #include "rtc_base/rate_limiter.h" 60 #include "rtc_base/task_utils/repeating_task.h" 61 62 namespace webrtc { 63 namespace { 64 const int64_t kRetransmitWindowSizeMs = 500; 65 const size_t kMaxOverheadBytes = 500; 66 67 constexpr TimeDelta kPacerQueueUpdateInterval = TimeDelta::Millis(25); 68 69 TargetRateConstraints ConvertConstraints(int min_bitrate_bps, 70 int max_bitrate_bps, 71 int start_bitrate_bps, 72 Clock* clock) { 73 TargetRateConstraints msg; 74 msg.at_time = clock->CurrentTime(); 75 msg.min_data_rate = min_bitrate_bps >= 0 76 ? DataRate::BitsPerSec(min_bitrate_bps) 77 : DataRate::Zero(); 78 msg.max_data_rate = max_bitrate_bps > 0 79 ? DataRate::BitsPerSec(max_bitrate_bps) 80 : DataRate::Infinity(); 81 if (start_bitrate_bps > 0) 82 msg.starting_rate = DataRate::BitsPerSec(start_bitrate_bps); 83 return msg; 84 } 85 86 TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints, 87 Clock* clock) { 88 return ConvertConstraints(contraints.min_bitrate_bps, 89 contraints.max_bitrate_bps, 90 contraints.start_bitrate_bps, clock); 91 } 92 93 bool IsRelayed(const NetworkRoute& route) { 94 return route.local.uses_turn() || route.remote.uses_turn(); 95 } 96 } // namespace 97 98 RtpTransportControllerSend::RtpTransportControllerSend( 99 const RtpTransportConfig& config) 100 : env_(config.env), 101 task_queue_(TaskQueueBase::Current()), 102 bitrate_configurator_(config.bitrate_config), 103 pacer_started_(false), 104 pacer_(&env_.clock(), 105 &packet_router_, 106 env_.field_trials(), 107 TimeDelta::Millis(5), 108 3), 109 observer_(nullptr), 110 controller_factory_override_(config.network_controller_factory), 111 controller_factory_fallback_( 112 std::make_unique<GoogCcNetworkControllerFactory>( 113 GoogCcFactoryConfig{.network_state_predictor_factory = 114 config.network_state_predictor_factory})), 115 process_interval_(controller_factory_fallback_->GetProcessInterval()), 116 last_report_block_time_(env_.clock().CurrentTime()), 117 initial_config_(env_), 118 reset_feedback_on_route_change_( 119 !env_.field_trials().IsEnabled("WebRTC-Bwe-NoFeedbackReset")), 120 add_pacing_to_cwin_(env_.field_trials().IsEnabled( 121 "WebRTC-AddPacingToCongestionWindowPushback")), 122 reset_bwe_on_adapter_id_change_( 123 env_.field_trials().IsEnabled("WebRTC-Bwe-ResetOnAdapterIdChange")), 124 relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()), 125 transport_overhead_bytes_per_packet_(0), 126 network_available_(false), 127 congestion_window_size_(DataSize::PlusInfinity()), 128 is_congested_(false), 129 retransmission_rate_limiter_(&env_.clock(), kRetransmitWindowSizeMs) { 130 ParseFieldTrial( 131 {&relay_bandwidth_cap_}, 132 env_.field_trials().Lookup("WebRTC-Bwe-NetworkRouteConstraints")); 133 initial_config_.constraints = 134 ConvertConstraints(config.bitrate_config, &env_.clock()); 135 RTC_DCHECK(config.bitrate_config.start_bitrate_bps > 0); 136 137 pacer_.SetPacingRates( 138 DataRate::BitsPerSec(config.bitrate_config.start_bitrate_bps), 139 DataRate::Zero()); 140 if (config.pacer_burst_interval) { 141 // Default burst interval overriden by config. 142 pacer_.SetSendBurstInterval(*config.pacer_burst_interval); 143 } 144 packet_router_.RegisterNotifyBweCallback( 145 [this](const RtpPacketToSend& packet, 146 const PacedPacketInfo& pacing_info) { 147 return NotifyBweOfPacedSentPacket(packet, pacing_info); 148 }); 149 } 150 151 RtpTransportControllerSend::~RtpTransportControllerSend() { 152 RTC_DCHECK_RUN_ON(&sequence_checker_); 153 RTC_DCHECK(video_rtp_senders_.empty()); 154 pacer_queue_update_task_.Stop(); 155 controller_task_.Stop(); 156 } 157 158 RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender( 159 const std::map<uint32_t, RtpState>& suspended_ssrcs, 160 const std::map<uint32_t, RtpPayloadState>& states, 161 const RtpConfig& rtp_config, 162 int rtcp_report_interval_ms, 163 Transport* send_transport, 164 const RtpSenderObservers& observers, 165 std::unique_ptr<FecController> fec_controller, 166 const RtpSenderFrameEncryptionConfig& frame_encryption_config, 167 scoped_refptr<FrameTransformerInterface> frame_transformer) { 168 RTC_DCHECK_RUN_ON(&sequence_checker_); 169 video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>( 170 env_, task_queue_, suspended_ssrcs, states, rtp_config, 171 rtcp_report_interval_ms, send_transport, observers, 172 // TODO(holmer): Remove this circular dependency by injecting 173 // the parts of RtpTransportControllerSendInterface that are really used. 174 this, &retransmission_rate_limiter_, std::move(fec_controller), 175 frame_encryption_config.frame_encryptor, 176 frame_encryption_config.crypto_options, std::move(frame_transformer))); 177 return video_rtp_senders_.back().get(); 178 } 179 180 void RtpTransportControllerSend::DestroyRtpVideoSender( 181 RtpVideoSenderInterface* rtp_video_sender) { 182 RTC_DCHECK_RUN_ON(&sequence_checker_); 183 std::vector<std::unique_ptr<RtpVideoSenderInterface>>::iterator it = 184 video_rtp_senders_.end(); 185 for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) { 186 if (it->get() == rtp_video_sender) { 187 break; 188 } 189 } 190 RTC_DCHECK(it != video_rtp_senders_.end()); 191 video_rtp_senders_.erase(it); 192 } 193 194 void RtpTransportControllerSend::RegisterSendingRtpStream( 195 RtpRtcpInterface& rtp_module) { 196 RTC_DCHECK_RUN_ON(&sequence_checker_); 197 // Allow pacer to send packets using this module. 198 packet_router_.AddSendRtpModule(&rtp_module, 199 /*remb_candidate=*/true); 200 pacer_.SetAllowProbeWithoutMediaPacket( 201 bwe_settings_.allow_probe_without_media && 202 packet_router_.SupportsRtxPayloadPadding()); 203 } 204 205 void RtpTransportControllerSend::DeRegisterSendingRtpStream( 206 RtpRtcpInterface& rtp_module) { 207 RTC_DCHECK_RUN_ON(&sequence_checker_); 208 // Disabling media, remove from packet router map to reduce size and 209 // prevent any stray packets in the pacer from asynchronously arriving 210 // to a disabled module. 211 packet_router_.RemoveSendRtpModule(&rtp_module); 212 // Clear the pacer queue of any packets pertaining to this module. 213 pacer_.RemovePacketsForSsrc(rtp_module.SSRC()); 214 if (rtp_module.RtxSsrc().has_value()) { 215 pacer_.RemovePacketsForSsrc(*rtp_module.RtxSsrc()); 216 } 217 if (rtp_module.FlexfecSsrc().has_value()) { 218 pacer_.RemovePacketsForSsrc(*rtp_module.FlexfecSsrc()); 219 } 220 pacer_.SetAllowProbeWithoutMediaPacket( 221 bwe_settings_.allow_probe_without_media && 222 packet_router_.SupportsRtxPayloadPadding()); 223 } 224 225 void RtpTransportControllerSend::UpdateControlState() { 226 std::optional<TargetTransferRate> update = control_handler_->GetUpdate(); 227 if (!update) 228 return; 229 retransmission_rate_limiter_.SetMaxRate(update->target_rate.bps()); 230 // We won't create control_handler_ until we have an observers. 231 RTC_DCHECK(observer_ != nullptr); 232 observer_->OnTargetTransferRate(*update); 233 } 234 235 void RtpTransportControllerSend::UpdateCongestedState() { 236 if (auto update = GetCongestedStateUpdate()) { 237 is_congested_ = update.value(); 238 pacer_.SetCongested(update.value()); 239 } 240 } 241 242 std::optional<bool> RtpTransportControllerSend::GetCongestedStateUpdate() 243 const { 244 bool congested = transport_feedback_adapter_.GetOutstandingData() >= 245 congestion_window_size_; 246 if (congested != is_congested_) 247 return congested; 248 return std::nullopt; 249 } 250 251 PacketRouter* RtpTransportControllerSend::packet_router() { 252 return &packet_router_; 253 } 254 255 NetworkStateEstimateObserver* 256 RtpTransportControllerSend::network_state_estimate_observer() { 257 return this; 258 } 259 260 RtpPacketSender* RtpTransportControllerSend::packet_sender() { 261 return &pacer_; 262 } 263 264 void RtpTransportControllerSend::SetAllocatedSendBitrateLimits( 265 BitrateAllocationLimits limits) { 266 RTC_DCHECK_RUN_ON(&sequence_checker_); 267 streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate; 268 streams_config_.max_padding_rate = limits.max_padding_rate; 269 streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate; 270 UpdateStreamsConfig(); 271 } 272 void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) { 273 RTC_DCHECK_RUN_ON(&sequence_checker_); 274 streams_config_.pacing_factor = pacing_factor; 275 UpdateStreamsConfig(); 276 } 277 void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) { 278 pacer_.SetQueueTimeLimit(TimeDelta::Millis(limit_ms)); 279 } 280 StreamFeedbackProvider* 281 RtpTransportControllerSend::GetStreamFeedbackProvider() { 282 return &feedback_demuxer_; 283 } 284 285 void RtpTransportControllerSend::ReconfigureBandwidthEstimation( 286 const BandwidthEstimationSettings& settings) { 287 RTC_DCHECK_RUN_ON(&sequence_checker_); 288 bwe_settings_ = settings; 289 290 streams_config_.enable_repeated_initial_probing = 291 bwe_settings_.allow_probe_without_media; 292 bool allow_probe_without_media = bwe_settings_.allow_probe_without_media && 293 packet_router_.SupportsRtxPayloadPadding(); 294 pacer_.SetAllowProbeWithoutMediaPacket(allow_probe_without_media); 295 296 if (controller_) { 297 // Recreate the controller and handler. 298 control_handler_ = nullptr; 299 controller_ = nullptr; 300 // The BWE controller is created when/if the network is available. 301 MaybeCreateControllers(); 302 if (controller_) { 303 BitrateConstraints constraints = bitrate_configurator_.GetConfig(); 304 UpdateBitrateConstraints(constraints); 305 UpdateStreamsConfig(); 306 UpdateNetworkAvailability(); 307 } 308 } 309 } 310 311 void RtpTransportControllerSend::RegisterTargetTransferRateObserver( 312 TargetTransferRateObserver* observer) { 313 RTC_DCHECK_RUN_ON(&sequence_checker_); 314 RTC_DCHECK(observer_ == nullptr); 315 observer_ = observer; 316 observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate); 317 MaybeCreateControllers(); 318 } 319 320 bool RtpTransportControllerSend::IsRelevantRouteChange( 321 const NetworkRoute& old_route, 322 const NetworkRoute& new_route) const { 323 bool connected_changed = old_route.connected != new_route.connected; 324 bool route_ids_changed = false; 325 bool relaying_changed = false; 326 327 if (reset_bwe_on_adapter_id_change_) { 328 route_ids_changed = 329 old_route.local.adapter_id() != new_route.local.adapter_id() || 330 old_route.remote.adapter_id() != new_route.remote.adapter_id(); 331 } else { 332 route_ids_changed = 333 old_route.local.network_id() != new_route.local.network_id() || 334 old_route.remote.network_id() != new_route.remote.network_id(); 335 } 336 if (relay_bandwidth_cap_->IsFinite()) { 337 relaying_changed = IsRelayed(old_route) != IsRelayed(new_route); 338 } 339 return connected_changed || route_ids_changed || relaying_changed; 340 } 341 342 void RtpTransportControllerSend::OnNetworkRouteChanged( 343 absl::string_view transport_name, 344 const NetworkRoute& network_route) { 345 RTC_DCHECK_RUN_ON(&sequence_checker_); 346 // Check if the network route is connected. 347 if (!network_route.connected) { 348 // TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and 349 // consider merging these two methods. 350 return; 351 } 352 353 std::optional<BitrateConstraints> relay_constraint_update = 354 ApplyOrLiftRelayCap(IsRelayed(network_route)); 355 356 // Check whether the network route has changed on each transport. 357 auto result = network_routes_.insert( 358 // Explicit conversion of transport_name to std::string here is necessary 359 // to support some platforms that cannot yet deal with implicit 360 // conversion in these types of situations. 361 std::make_pair(std::string(transport_name), network_route)); 362 auto kv = result.first; 363 bool inserted = result.second; 364 if (inserted || !(kv->second == network_route)) { 365 RTC_LOG(LS_INFO) << "Network route changed on transport " << transport_name 366 << ": new_route = " << network_route.DebugString(); 367 if (!inserted) { 368 RTC_LOG(LS_INFO) << "old_route = " << kv->second.DebugString(); 369 } 370 } 371 372 if (inserted) { 373 if (relay_constraint_update.has_value()) { 374 UpdateBitrateConstraints(*relay_constraint_update); 375 } 376 transport_overhead_bytes_per_packet_ = network_route.packet_overhead; 377 // No need to reset BWE if this is the first time the network connects. 378 return; 379 } 380 381 const NetworkRoute old_route = kv->second; 382 kv->second = network_route; 383 384 // Check if enough conditions of the new/old route has changed 385 // to trigger resetting of bitrates (and a probe). 386 if (IsRelevantRouteChange(old_route, network_route)) { 387 BitrateConstraints bitrate_config = bitrate_configurator_.GetConfig(); 388 RTC_LOG(LS_INFO) << "Reset bitrates to min: " 389 << bitrate_config.min_bitrate_bps 390 << " bps, start: " << bitrate_config.start_bitrate_bps 391 << " bps, max: " << bitrate_config.max_bitrate_bps 392 << " bps."; 393 RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0); 394 395 env_.event_log().Log(std::make_unique<RtcEventRouteChange>( 396 network_route.connected, network_route.packet_overhead)); 397 if (transport_maybe_support_ecn_) { 398 sending_packets_as_ect1_ = true; 399 packet_router_.ConfigureForRfc8888Feedback(sending_packets_as_ect1_); 400 } 401 NetworkRouteChange msg; 402 msg.at_time = env_.clock().CurrentTime(); 403 msg.constraints = ConvertConstraints(bitrate_config, &env_.clock()); 404 transport_overhead_bytes_per_packet_ = network_route.packet_overhead; 405 if (reset_feedback_on_route_change_) { 406 transport_feedback_adapter_.SetNetworkRoute(network_route); 407 } 408 if (controller_) { 409 PostUpdates(controller_->OnNetworkRouteChange(msg)); 410 } else { 411 UpdateInitialConstraints(msg.constraints); 412 } 413 is_congested_ = false; 414 pacer_.SetCongested(false); 415 } 416 } 417 void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) { 418 RTC_DCHECK_RUN_ON(&sequence_checker_); 419 RTC_LOG(LS_VERBOSE) << "SignalNetworkState " 420 << (network_available ? "Up" : "Down"); 421 network_available_ = network_available; 422 if (network_available) { 423 pacer_.Resume(); 424 } else { 425 pacer_.Pause(); 426 } 427 is_congested_ = false; 428 pacer_.SetCongested(false); 429 430 if (!controller_) { 431 MaybeCreateControllers(); 432 } 433 UpdateNetworkAvailability(); 434 for (auto& rtp_sender : video_rtp_senders_) { 435 rtp_sender->OnNetworkAvailability(network_available); 436 } 437 } 438 NetworkLinkRtcpObserver* RtpTransportControllerSend::GetRtcpObserver() { 439 return this; 440 } 441 int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const { 442 return pacer_.OldestPacketWaitTime().ms(); 443 } 444 std::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime() 445 const { 446 return pacer_.FirstSentPacketTime(); 447 } 448 void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { 449 RTC_DCHECK_RUN_ON(&sequence_checker_); 450 451 streams_config_.requests_alr_probing = enable; 452 UpdateStreamsConfig(); 453 } 454 void RtpTransportControllerSend::OnSentPacket( 455 const SentPacketInfo& sent_packet) { 456 // Normally called on the network thread! 457 // TODO(crbug.com/1373439): Clarify other thread contexts calling in, 458 // and simplify task posting logic when the combined network/worker project 459 // launches. 460 if (TaskQueueBase::Current() != task_queue_) { 461 task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() { 462 RTC_DCHECK_RUN_ON(&sequence_checker_); 463 ProcessSentPacket(sent_packet); 464 })); 465 return; 466 } 467 468 RTC_DCHECK_RUN_ON(&sequence_checker_); 469 ProcessSentPacket(sent_packet); 470 } 471 472 void RtpTransportControllerSend::ProcessSentPacket( 473 const SentPacketInfo& sent_packet) { 474 RTC_DCHECK_RUN_ON(&sequence_checker_); 475 std::optional<SentPacket> packet_msg = 476 transport_feedback_adapter_.ProcessSentPacket(sent_packet); 477 if (!packet_msg) 478 return; 479 480 auto congestion_update = GetCongestedStateUpdate(); 481 NetworkControlUpdate control_update; 482 if (controller_) 483 control_update = controller_->OnSentPacket(*packet_msg); 484 if (!congestion_update && !control_update.has_updates()) 485 return; 486 ProcessSentPacketUpdates(std::move(control_update)); 487 } 488 489 // RTC_RUN_ON(task_queue_) 490 void RtpTransportControllerSend::ProcessSentPacketUpdates( 491 NetworkControlUpdate updates) { 492 RTC_DCHECK_RUN_ON(&sequence_checker_); 493 // Only update outstanding data if: 494 // 1. Packet feedback is used. 495 // 2. The packet has not yet received an acknowledgement. 496 // 3. It is not a retransmission of an earlier packet. 497 UpdateCongestedState(); 498 if (controller_) { 499 PostUpdates(std::move(updates)); 500 } 501 } 502 503 void RtpTransportControllerSend::OnReceivedPacket( 504 const ReceivedPacket& packet_msg) { 505 RTC_DCHECK_RUN_ON(&sequence_checker_); 506 if (controller_) 507 PostUpdates(controller_->OnReceivedPacket(packet_msg)); 508 } 509 510 void RtpTransportControllerSend::UpdateBitrateConstraints( 511 const BitrateConstraints& updated) { 512 RTC_DCHECK_RUN_ON(&sequence_checker_); 513 TargetRateConstraints msg = ConvertConstraints(updated, &env_.clock()); 514 if (controller_) { 515 PostUpdates(controller_->OnTargetRateConstraints(msg)); 516 } else { 517 UpdateInitialConstraints(msg); 518 } 519 } 520 521 void RtpTransportControllerSend::SetSdpBitrateParameters( 522 const BitrateConstraints& constraints) { 523 RTC_DCHECK_RUN_ON(&sequence_checker_); 524 std::optional<BitrateConstraints> updated = 525 bitrate_configurator_.UpdateWithSdpParameters(constraints); 526 if (updated.has_value()) { 527 UpdateBitrateConstraints(*updated); 528 } else { 529 RTC_LOG(LS_VERBOSE) 530 << "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: " 531 "nothing to update"; 532 } 533 } 534 535 void RtpTransportControllerSend::SetClientBitratePreferences( 536 const BitrateSettings& preferences) { 537 RTC_DCHECK_RUN_ON(&sequence_checker_); 538 std::optional<BitrateConstraints> updated = 539 bitrate_configurator_.UpdateWithClientPreferences(preferences); 540 if (updated.has_value()) { 541 UpdateBitrateConstraints(*updated); 542 } else { 543 RTC_LOG(LS_VERBOSE) 544 << "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: " 545 "nothing to update"; 546 } 547 } 548 549 std::optional<BitrateConstraints> 550 RtpTransportControllerSend::ApplyOrLiftRelayCap(bool is_relayed) { 551 DataRate cap = is_relayed ? relay_bandwidth_cap_ : DataRate::PlusInfinity(); 552 return bitrate_configurator_.UpdateWithRelayCap(cap); 553 } 554 555 void RtpTransportControllerSend::OnTransportOverheadChanged( 556 size_t transport_overhead_bytes_per_packet) { 557 RTC_DCHECK_RUN_ON(&sequence_checker_); 558 if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) { 559 RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes; 560 return; 561 } 562 563 pacer_.SetTransportOverhead( 564 DataSize::Bytes(transport_overhead_bytes_per_packet)); 565 566 // TODO(holmer): Call AudioRtpSenders when they have been moved to 567 // RtpTransportControllerSend. 568 for (auto& rtp_video_sender : video_rtp_senders_) { 569 rtp_video_sender->OnTransportOverheadChanged( 570 transport_overhead_bytes_per_packet); 571 } 572 } 573 574 void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender( 575 bool account_for_audio) { 576 pacer_.SetAccountForAudioPackets(account_for_audio); 577 } 578 579 void RtpTransportControllerSend::IncludeOverheadInPacedSender() { 580 pacer_.SetIncludeOverhead(); 581 } 582 583 void RtpTransportControllerSend::EnsureStarted() { 584 RTC_DCHECK_RUN_ON(&sequence_checker_); 585 if (!pacer_started_) { 586 pacer_started_ = true; 587 pacer_.EnsureStarted(); 588 } 589 } 590 591 void RtpTransportControllerSend::OnReceiverEstimatedMaxBitrate( 592 Timestamp receive_time, 593 DataRate bitrate) { 594 RTC_DCHECK_RUN_ON(&sequence_checker_); 595 RemoteBitrateReport msg; 596 msg.receive_time = receive_time; 597 msg.bandwidth = bitrate; 598 if (controller_) 599 PostUpdates(controller_->OnRemoteBitrateReport(msg)); 600 } 601 602 void RtpTransportControllerSend::OnRttUpdate(Timestamp receive_time, 603 TimeDelta rtt) { 604 RTC_DCHECK_RUN_ON(&sequence_checker_); 605 RoundTripTimeUpdate report; 606 report.receive_time = receive_time; 607 report.round_trip_time = rtt.RoundTo(TimeDelta::Millis(1)); 608 report.smoothed = false; 609 if (controller_ && !report.round_trip_time.IsZero()) 610 PostUpdates(controller_->OnRoundTripTimeUpdate(report)); 611 } 612 613 void RtpTransportControllerSend::NotifyBweOfPacedSentPacket( 614 const RtpPacketToSend& packet, 615 const PacedPacketInfo& pacing_info) { 616 RTC_DCHECK_RUN_ON(&sequence_checker_); 617 618 if (!packet.transport_sequence_number()) { 619 return; 620 } 621 if (!packet.packet_type()) { 622 RTC_DCHECK_NOTREACHED() << "Unknown packet type"; 623 return; 624 } 625 Timestamp creation_time = env_.clock().CurrentTime(); 626 transport_feedback_adapter_.AddPacket( 627 packet, pacing_info, transport_overhead_bytes_per_packet_, creation_time); 628 } 629 630 void RtpTransportControllerSend:: 631 EnableCongestionControlFeedbackAccordingToRfc8888() { 632 RTC_DCHECK_RUN_ON(&sequence_checker_); 633 transport_maybe_support_ecn_ = true; 634 sending_packets_as_ect1_ = true; 635 packet_router_.ConfigureForRfc8888Feedback(sending_packets_as_ect1_); 636 } 637 638 std::optional<int> 639 RtpTransportControllerSend::ReceivedCongestionControlFeedbackCount() const { 640 RTC_DCHECK_RUN_ON(&sequence_checker_); 641 if (!transport_maybe_support_ecn_) { 642 return std::nullopt; 643 } 644 return feedback_count_; 645 } 646 647 std::optional<int> 648 RtpTransportControllerSend::ReceivedTransportCcFeedbackCount() const { 649 RTC_DCHECK_RUN_ON(&sequence_checker_); 650 if (transport_maybe_support_ecn_) { 651 return std::nullopt; 652 } 653 return transport_cc_feedback_count_; 654 } 655 656 void RtpTransportControllerSend::OnTransportFeedback( 657 Timestamp receive_time, 658 const rtcp::TransportFeedback& feedback) { 659 RTC_DCHECK_RUN_ON(&sequence_checker_); 660 ++transport_cc_feedback_count_; 661 std::optional<TransportPacketsFeedback> feedback_msg = 662 transport_feedback_adapter_.ProcessTransportFeedback(feedback, 663 receive_time); 664 if (feedback_msg) { 665 HandleTransportPacketsFeedback(*feedback_msg); 666 } 667 } 668 669 void RtpTransportControllerSend::OnCongestionControlFeedback( 670 Timestamp receive_time, 671 const rtcp::CongestionControlFeedback& feedback) { 672 RTC_DCHECK_RUN_ON(&sequence_checker_); 673 ++feedback_count_; 674 std::optional<TransportPacketsFeedback> feedback_msg = 675 transport_feedback_adapter_.ProcessCongestionControlFeedback( 676 feedback, receive_time); 677 if (feedback_msg) { 678 HandleTransportPacketsFeedback(*feedback_msg); 679 } 680 } 681 682 void RtpTransportControllerSend::HandleTransportPacketsFeedback( 683 const TransportPacketsFeedback& feedback) { 684 if (sending_packets_as_ect1_) { 685 // If transport does not support ECN, packets should not be sent as ECT(1). 686 // TODO: bugs.webrtc.org/42225697 - adapt to ECN feedback and continue to 687 // send packets as ECT(1) if transport is ECN capable. 688 sending_packets_as_ect1_ = false; 689 RTC_LOG(LS_INFO) << "Transport is " 690 << (feedback.transport_supports_ecn ? "" : "not ") 691 << "ECN capable. Stop sending ECT(1)."; 692 packet_router_.ConfigureForRfc8888Feedback(sending_packets_as_ect1_); 693 } 694 695 feedback_demuxer_.OnTransportFeedback(feedback); 696 if (controller_) 697 PostUpdates(controller_->OnTransportPacketsFeedback(feedback)); 698 699 // Only update outstanding data if any packet is first time acked. 700 UpdateCongestedState(); 701 } 702 703 void RtpTransportControllerSend::OnRemoteNetworkEstimate( 704 NetworkStateEstimate estimate) { 705 RTC_DCHECK_RUN_ON(&sequence_checker_); 706 estimate.update_time = env_.clock().CurrentTime(); 707 if (controller_) 708 PostUpdates(controller_->OnNetworkStateEstimate(estimate)); 709 } 710 711 void RtpTransportControllerSend::MaybeCreateControllers() { 712 RTC_DCHECK(!controller_); 713 RTC_DCHECK(!control_handler_); 714 715 if (!network_available_ || !observer_) 716 return; 717 control_handler_ = std::make_unique<CongestionControlHandler>(); 718 719 initial_config_.constraints.at_time = env_.clock().CurrentTime(); 720 initial_config_.stream_based_config = streams_config_; 721 722 // TODO(srte): Use fallback controller if no feedback is available. 723 if (controller_factory_override_) { 724 RTC_LOG(LS_INFO) << "Creating overridden congestion controller"; 725 controller_ = controller_factory_override_->Create(initial_config_); 726 process_interval_ = controller_factory_override_->GetProcessInterval(); 727 } else { 728 RTC_LOG(LS_INFO) << "Creating fallback congestion controller"; 729 controller_ = controller_factory_fallback_->Create(initial_config_); 730 process_interval_ = controller_factory_fallback_->GetProcessInterval(); 731 } 732 UpdateControllerWithTimeInterval(); 733 StartProcessPeriodicTasks(); 734 } 735 736 void RtpTransportControllerSend::UpdateNetworkAvailability() { 737 if (!controller_) { 738 return; 739 } 740 NetworkAvailability msg; 741 msg.at_time = env_.clock().CurrentTime(); 742 msg.network_available = network_available_; 743 control_handler_->SetNetworkAvailability(network_available_); 744 PostUpdates(controller_->OnNetworkAvailability(msg)); 745 UpdateControlState(); 746 } 747 748 void RtpTransportControllerSend::UpdateInitialConstraints( 749 TargetRateConstraints new_contraints) { 750 if (!new_contraints.starting_rate) 751 new_contraints.starting_rate = initial_config_.constraints.starting_rate; 752 RTC_DCHECK(new_contraints.starting_rate); 753 initial_config_.constraints = new_contraints; 754 } 755 756 void RtpTransportControllerSend::StartProcessPeriodicTasks() { 757 RTC_DCHECK_RUN_ON(&sequence_checker_); 758 if (!pacer_queue_update_task_.Running()) { 759 pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart( 760 task_queue_, kPacerQueueUpdateInterval, [this]() { 761 RTC_DCHECK_RUN_ON(&sequence_checker_); 762 TimeDelta expected_queue_time = pacer_.ExpectedQueueTime(); 763 control_handler_->SetPacerQueue(expected_queue_time); 764 UpdateControlState(); 765 return kPacerQueueUpdateInterval; 766 }); 767 } 768 controller_task_.Stop(); 769 if (process_interval_.IsFinite()) { 770 controller_task_ = RepeatingTaskHandle::DelayedStart( 771 task_queue_, process_interval_, [this]() { 772 RTC_DCHECK_RUN_ON(&sequence_checker_); 773 UpdateControllerWithTimeInterval(); 774 return process_interval_; 775 }); 776 } 777 } 778 779 void RtpTransportControllerSend::UpdateControllerWithTimeInterval() { 780 RTC_DCHECK(controller_); 781 ProcessInterval msg; 782 msg.at_time = env_.clock().CurrentTime(); 783 if (add_pacing_to_cwin_) 784 msg.pacer_queue = pacer_.QueueSizeData(); 785 PostUpdates(controller_->OnProcessInterval(msg)); 786 } 787 788 void RtpTransportControllerSend::UpdateStreamsConfig() { 789 streams_config_.at_time = env_.clock().CurrentTime(); 790 if (controller_) 791 PostUpdates(controller_->OnStreamsConfig(streams_config_)); 792 } 793 794 void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) { 795 if (update.congestion_window) { 796 congestion_window_size_ = *update.congestion_window; 797 UpdateCongestedState(); 798 } 799 if (update.pacer_config) { 800 pacer_.SetPacingRates(update.pacer_config->data_rate(), 801 update.pacer_config->pad_rate()); 802 } 803 if (!update.probe_cluster_configs.empty()) { 804 pacer_.CreateProbeClusters(std::move(update.probe_cluster_configs)); 805 } 806 if (update.target_rate) { 807 control_handler_->SetTargetRate(*update.target_rate); 808 UpdateControlState(); 809 } 810 } 811 812 void RtpTransportControllerSend::OnReport( 813 Timestamp receive_time, 814 ArrayView<const ReportBlockData> report_blocks) { 815 RTC_DCHECK_RUN_ON(&sequence_checker_); 816 if (report_blocks.empty()) 817 return; 818 819 int total_packets_lost_delta = 0; 820 int total_packets_delta = 0; 821 822 // Compute the packet loss from all report blocks. 823 for (const ReportBlockData& report_block : report_blocks) { 824 auto [it, inserted] = 825 last_report_blocks_.try_emplace(report_block.source_ssrc()); 826 LossReport& last_loss_report = it->second; 827 if (!inserted) { 828 total_packets_delta += report_block.extended_highest_sequence_number() - 829 last_loss_report.extended_highest_sequence_number; 830 total_packets_lost_delta += 831 report_block.cumulative_lost() - last_loss_report.cumulative_lost; 832 } 833 last_loss_report.extended_highest_sequence_number = 834 report_block.extended_highest_sequence_number(); 835 last_loss_report.cumulative_lost = report_block.cumulative_lost(); 836 } 837 // Can only compute delta if there has been previous blocks to compare to. If 838 // not, total_packets_delta will be unchanged and there's nothing more to do. 839 if (!total_packets_delta) 840 return; 841 int packets_received_delta = total_packets_delta - total_packets_lost_delta; 842 // To detect lost packets, at least one packet has to be received. This check 843 // is needed to avoid bandwith detection update in 844 // VideoSendStreamTest.SuspendBelowMinBitrate 845 846 if (packets_received_delta < 1) 847 return; 848 TransportLossReport msg; 849 msg.packets_lost_delta = total_packets_lost_delta; 850 msg.packets_received_delta = packets_received_delta; 851 msg.receive_time = receive_time; 852 msg.start_time = last_report_block_time_; 853 msg.end_time = receive_time; 854 if (controller_) 855 PostUpdates(controller_->OnTransportLossReport(msg)); 856 last_report_block_time_ = receive_time; 857 } 858 859 } // namespace webrtc