rtcp_transceiver_impl.cc (32950B)
1 /* 2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h" 12 13 #include <algorithm> 14 #include <cstddef> 15 #include <cstdint> 16 #include <functional> 17 #include <iterator> 18 #include <memory> 19 #include <optional> 20 #include <string> 21 #include <utility> 22 #include <vector> 23 24 #include "absl/algorithm/container.h" 25 #include "api/array_view.h" 26 #include "api/rtp_headers.h" 27 #include "api/task_queue/task_queue_base.h" 28 #include "api/units/data_rate.h" 29 #include "api/units/time_delta.h" 30 #include "api/units/timestamp.h" 31 #include "api/video/video_bitrate_allocation.h" 32 #include "api/video/video_codec_constants.h" 33 #include "modules/rtp_rtcp/include/receive_statistics.h" 34 #include "modules/rtp_rtcp/include/report_block_data.h" 35 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 36 #include "modules/rtp_rtcp/source/ntp_time_util.h" 37 #include "modules/rtp_rtcp/source/rtcp_packet.h" 38 #include "modules/rtp_rtcp/source/rtcp_packet/bye.h" 39 #include "modules/rtp_rtcp/source/rtcp_packet/common_header.h" 40 #include "modules/rtp_rtcp/source/rtcp_packet/congestion_control_feedback.h" 41 #include "modules/rtp_rtcp/source/rtcp_packet/dlrr.h" 42 #include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h" 43 #include "modules/rtp_rtcp/source/rtcp_packet/fir.h" 44 #include "modules/rtp_rtcp/source/rtcp_packet/nack.h" 45 #include "modules/rtp_rtcp/source/rtcp_packet/pli.h" 46 #include "modules/rtp_rtcp/source/rtcp_packet/psfb.h" 47 #include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h" 48 #include "modules/rtp_rtcp/source/rtcp_packet/remb.h" 49 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h" 50 #include "modules/rtp_rtcp/source/rtcp_packet/rrtr.h" 51 #include "modules/rtp_rtcp/source/rtcp_packet/rtpfb.h" 52 #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h" 53 #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h" 54 #include "modules/rtp_rtcp/source/rtcp_packet/target_bitrate.h" 55 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" 56 #include "modules/rtp_rtcp/source/rtcp_transceiver_config.h" 57 #include "rtc_base/checks.h" 58 #include "rtc_base/containers/flat_map.h" 59 #include "rtc_base/logging.h" 60 #include "rtc_base/numerics/divide_round.h" 61 #include "rtc_base/task_utils/repeating_task.h" 62 #include "system_wrappers/include/clock.h" 63 #include "system_wrappers/include/ntp_time.h" 64 65 namespace webrtc { 66 namespace { 67 68 struct SenderReportTimes { 69 Timestamp local_received_time; 70 NtpTime remote_sent_time; 71 }; 72 73 std::function<void(ArrayView<const uint8_t>)> GetRtcpTransport( 74 const RtcpTransceiverConfig& config) { 75 if (config.rtcp_transport != nullptr) { 76 return config.rtcp_transport; 77 } 78 79 bool first = true; 80 std::string log_prefix = config.debug_id; 81 return [first, log_prefix](ArrayView<const uint8_t> /* packet */) mutable { 82 if (first) { 83 RTC_LOG(LS_ERROR) << log_prefix << "Sending RTCP packets is disabled."; 84 first = false; 85 } 86 }; 87 } 88 89 } // namespace 90 91 struct RtcpTransceiverImpl::RemoteSenderState { 92 uint8_t fir_sequence_number = 0; 93 std::optional<SenderReportTimes> last_received_sender_report; 94 std::vector<MediaReceiverRtcpObserver*> observers; 95 }; 96 97 struct RtcpTransceiverImpl::LocalSenderState { 98 uint32_t ssrc; 99 size_t last_num_sent_bytes = 0; 100 ReportBlockData report_block; 101 // Sequence number of the last FIR message per sender SSRC. 102 flat_map<uint32_t, uint8_t> last_fir; 103 RtpStreamRtcpHandler* handler = nullptr; 104 }; 105 106 // Helper to put several RTCP packets into lower layer datagram composing 107 // Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2. 108 // TODO(bugs.webrtc.org/8239): When in compound mode and packets are so many 109 // that several compound RTCP packets need to be generated, ensure each packet 110 // is compound. 111 class RtcpTransceiverImpl::PacketSender { 112 public: 113 PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback, 114 size_t max_packet_size) 115 : callback_(callback), max_packet_size_(max_packet_size) { 116 RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE); 117 } 118 ~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; } 119 120 // Appends a packet to pending compound packet. 121 // Sends rtcp compound packet if buffer was already full and resets buffer. 122 void AppendPacket(const rtcp::RtcpPacket& packet) { 123 packet.Create(buffer_, &index_, max_packet_size_, callback_); 124 } 125 126 // Sends pending rtcp compound packet. 127 void Send() { 128 if (index_ > 0) { 129 callback_(ArrayView<const uint8_t>(buffer_, index_)); 130 index_ = 0; 131 } 132 } 133 134 bool IsEmpty() const { return index_ == 0; } 135 136 private: 137 const rtcp::RtcpPacket::PacketReadyCallback callback_; 138 const size_t max_packet_size_; 139 size_t index_ = 0; 140 uint8_t buffer_[IP_PACKET_SIZE]; 141 }; 142 143 RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config) 144 : config_(config), 145 rtcp_transport_(GetRtcpTransport(config_)), 146 ready_to_send_(config.initial_ready_to_send) { 147 RTC_CHECK(config_.Validate()); 148 if (ready_to_send_ && config_.schedule_periodic_compound_packets) { 149 SchedulePeriodicCompoundPackets(config_.initial_report_delay); 150 } 151 } 152 153 RtcpTransceiverImpl::~RtcpTransceiverImpl() = default; 154 155 void RtcpTransceiverImpl::AddMediaReceiverRtcpObserver( 156 uint32_t remote_ssrc, 157 MediaReceiverRtcpObserver* observer) { 158 if (config_.receive_statistics == nullptr && remote_senders_.empty()) { 159 RTC_LOG(LS_WARNING) << config_.debug_id 160 << "receive statistic is not set. RTCP report blocks " 161 "will not be generated."; 162 } 163 auto& stored = remote_senders_[remote_ssrc].observers; 164 RTC_DCHECK(!absl::c_linear_search(stored, observer)); 165 stored.push_back(observer); 166 } 167 168 void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver( 169 uint32_t remote_ssrc, 170 MediaReceiverRtcpObserver* observer) { 171 auto remote_sender_it = remote_senders_.find(remote_ssrc); 172 if (remote_sender_it == remote_senders_.end()) 173 return; 174 auto& stored = remote_sender_it->second.observers; 175 auto it = absl::c_find(stored, observer); 176 if (it == stored.end()) 177 return; 178 stored.erase(it); 179 } 180 181 bool RtcpTransceiverImpl::AddMediaSender(uint32_t local_ssrc, 182 RtpStreamRtcpHandler* handler) { 183 RTC_DCHECK(handler != nullptr); 184 LocalSenderState state; 185 state.ssrc = local_ssrc; 186 state.handler = handler; 187 local_senders_.push_back(state); 188 auto it = std::prev(local_senders_.end()); 189 auto [unused, inserted] = local_senders_by_ssrc_.emplace(local_ssrc, it); 190 if (!inserted) { 191 local_senders_.pop_back(); 192 return false; 193 } 194 return true; 195 } 196 197 bool RtcpTransceiverImpl::RemoveMediaSender(uint32_t local_ssrc) { 198 auto index_it = local_senders_by_ssrc_.find(local_ssrc); 199 if (index_it == local_senders_by_ssrc_.end()) { 200 return false; 201 } 202 local_senders_.erase(index_it->second); 203 local_senders_by_ssrc_.erase(index_it); 204 return true; 205 } 206 207 void RtcpTransceiverImpl::SetReadyToSend(bool ready) { 208 if (config_.schedule_periodic_compound_packets) { 209 if (ready_to_send_ && !ready) 210 periodic_task_handle_.Stop(); 211 212 if (!ready_to_send_ && ready) // Restart periodic sending. 213 SchedulePeriodicCompoundPackets(config_.report_period / 2); 214 } 215 ready_to_send_ = ready; 216 } 217 218 void RtcpTransceiverImpl::ReceivePacket(ArrayView<const uint8_t> packet, 219 Timestamp now) { 220 // Report blocks may be spread across multiple sender and receiver reports. 221 std::vector<ReportBlockData> report_blocks; 222 223 while (!packet.empty()) { 224 rtcp::CommonHeader rtcp_block; 225 if (!rtcp_block.Parse(packet.data(), packet.size())) 226 break; 227 228 HandleReceivedPacket(rtcp_block, now, report_blocks); 229 230 packet = packet.subview(rtcp_block.packet_size()); 231 } 232 233 if (!report_blocks.empty()) { 234 ProcessReportBlocks(now, report_blocks); 235 } 236 } 237 238 void RtcpTransceiverImpl::SendCompoundPacket() { 239 if (!ready_to_send_) 240 return; 241 SendPeriodicCompoundPacket(); 242 ReschedulePeriodicCompoundPackets(); 243 } 244 245 void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps, 246 std::vector<uint32_t> ssrcs) { 247 RTC_DCHECK_GE(bitrate_bps, 0); 248 249 bool send_now = config_.send_remb_on_change && 250 (!remb_.has_value() || bitrate_bps != remb_->bitrate_bps()); 251 remb_.emplace(); 252 remb_->SetSsrcs(std::move(ssrcs)); 253 remb_->SetBitrateBps(bitrate_bps); 254 remb_->SetSenderSsrc(config_.feedback_ssrc); 255 // TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb 256 // immideately on large bitrate change when there is one RtcpTransceiver per 257 // rtp transport. 258 if (send_now) { 259 std::optional<rtcp::Remb> remb; 260 remb.swap(remb_); 261 SendImmediateFeedback(*remb); 262 remb.swap(remb_); 263 } 264 } 265 266 void RtcpTransceiverImpl::UnsetRemb() { 267 remb_.reset(); 268 } 269 270 void RtcpTransceiverImpl::SendNack(uint32_t ssrc, 271 std::vector<uint16_t> sequence_numbers) { 272 RTC_DCHECK(!sequence_numbers.empty()); 273 if (!ready_to_send_) 274 return; 275 rtcp::Nack nack; 276 nack.SetSenderSsrc(config_.feedback_ssrc); 277 nack.SetMediaSsrc(ssrc); 278 nack.SetPacketIds(std::move(sequence_numbers)); 279 SendImmediateFeedback(nack); 280 } 281 282 void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) { 283 if (!ready_to_send_) 284 return; 285 rtcp::Pli pli; 286 pli.SetSenderSsrc(config_.feedback_ssrc); 287 pli.SetMediaSsrc(ssrc); 288 SendImmediateFeedback(pli); 289 } 290 291 void RtcpTransceiverImpl::SendFullIntraRequest(ArrayView<const uint32_t> ssrcs, 292 bool new_request) { 293 RTC_DCHECK(!ssrcs.empty()); 294 if (!ready_to_send_) 295 return; 296 rtcp::Fir fir; 297 fir.SetSenderSsrc(config_.feedback_ssrc); 298 for (uint32_t media_ssrc : ssrcs) { 299 uint8_t& command_seq_num = remote_senders_[media_ssrc].fir_sequence_number; 300 if (new_request) 301 command_seq_num += 1; 302 fir.AddRequestTo(media_ssrc, command_seq_num); 303 } 304 SendImmediateFeedback(fir); 305 } 306 307 void RtcpTransceiverImpl::HandleReceivedPacket( 308 const rtcp::CommonHeader& rtcp_packet_header, 309 Timestamp now, 310 std::vector<ReportBlockData>& report_blocks) { 311 switch (rtcp_packet_header.type()) { 312 case rtcp::Bye::kPacketType: 313 HandleBye(rtcp_packet_header); 314 break; 315 case rtcp::SenderReport::kPacketType: 316 HandleSenderReport(rtcp_packet_header, now, report_blocks); 317 break; 318 case rtcp::ReceiverReport::kPacketType: 319 HandleReceiverReport(rtcp_packet_header, now, report_blocks); 320 break; 321 case rtcp::ExtendedReports::kPacketType: 322 HandleExtendedReports(rtcp_packet_header, now); 323 break; 324 case rtcp::Psfb::kPacketType: 325 HandlePayloadSpecificFeedback(rtcp_packet_header, now); 326 break; 327 case rtcp::Rtpfb::kPacketType: 328 HandleRtpFeedback(rtcp_packet_header, now); 329 break; 330 } 331 } 332 333 void RtcpTransceiverImpl::HandleBye( 334 const rtcp::CommonHeader& rtcp_packet_header) { 335 rtcp::Bye bye; 336 if (!bye.Parse(rtcp_packet_header)) 337 return; 338 auto remote_sender_it = remote_senders_.find(bye.sender_ssrc()); 339 if (remote_sender_it == remote_senders_.end()) 340 return; 341 for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers) 342 observer->OnBye(bye.sender_ssrc()); 343 } 344 345 void RtcpTransceiverImpl::HandleSenderReport( 346 const rtcp::CommonHeader& rtcp_packet_header, 347 Timestamp now, 348 std::vector<ReportBlockData>& report_blocks) { 349 rtcp::SenderReport sender_report; 350 if (!sender_report.Parse(rtcp_packet_header)) 351 return; 352 RemoteSenderState& remote_sender = 353 remote_senders_[sender_report.sender_ssrc()]; 354 remote_sender.last_received_sender_report = { 355 {.local_received_time = now, .remote_sent_time = sender_report.ntp()}}; 356 HandleReportBlocks(sender_report.sender_ssrc(), now, 357 sender_report.report_blocks(), report_blocks); 358 359 for (MediaReceiverRtcpObserver* observer : remote_sender.observers) { 360 observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(), 361 sender_report.rtp_timestamp()); 362 } 363 } 364 365 void RtcpTransceiverImpl::HandleReceiverReport( 366 const rtcp::CommonHeader& rtcp_packet_header, 367 Timestamp now, 368 std::vector<ReportBlockData>& report_blocks) { 369 rtcp::ReceiverReport receiver_report; 370 if (!receiver_report.Parse(rtcp_packet_header)) { 371 return; 372 } 373 HandleReportBlocks(receiver_report.sender_ssrc(), now, 374 receiver_report.report_blocks(), report_blocks); 375 } 376 377 void RtcpTransceiverImpl::HandleReportBlocks( 378 uint32_t sender_ssrc, 379 Timestamp now, 380 ArrayView<const rtcp::ReportBlock> rtcp_report_blocks, 381 std::vector<ReportBlockData>& report_blocks) { 382 if (rtcp_report_blocks.empty()) { 383 return; 384 } 385 NtpTime now_ntp = config_.clock->ConvertTimestampToNtpTime(now); 386 uint32_t receive_time_ntp = CompactNtp(now_ntp); 387 Timestamp now_utc = Clock::NtpToUtc(now_ntp); 388 389 for (const rtcp::ReportBlock& block : rtcp_report_blocks) { 390 std::optional<TimeDelta> rtt; 391 if (block.last_sr() != 0) { 392 rtt = CompactNtpRttToTimeDelta( 393 receive_time_ntp - block.delay_since_last_sr() - block.last_sr()); 394 } 395 396 auto sender_it = local_senders_by_ssrc_.find(block.source_ssrc()); 397 if (sender_it != local_senders_by_ssrc_.end()) { 398 LocalSenderState& state = *sender_it->second; 399 state.report_block.SetReportBlock(sender_ssrc, block, now_utc, now); 400 if (rtt.has_value()) { 401 state.report_block.AddRoundTripTimeSample(*rtt); 402 } 403 state.handler->OnReport(state.report_block); 404 report_blocks.push_back(state.report_block); 405 } else { 406 // No registered sender for this report block, still report it to the 407 // network link. 408 ReportBlockData report_block; 409 report_block.SetReportBlock(sender_ssrc, block, now_utc, now); 410 if (rtt.has_value()) { 411 report_block.AddRoundTripTimeSample(*rtt); 412 } 413 report_blocks.push_back(report_block); 414 } 415 } 416 } 417 418 void RtcpTransceiverImpl::HandlePayloadSpecificFeedback( 419 const rtcp::CommonHeader& rtcp_packet_header, 420 Timestamp now) { 421 switch (rtcp_packet_header.fmt()) { 422 case rtcp::Fir::kFeedbackMessageType: 423 HandleFir(rtcp_packet_header); 424 break; 425 case rtcp::Pli::kFeedbackMessageType: 426 HandlePli(rtcp_packet_header); 427 break; 428 case rtcp::Psfb::kAfbMessageType: 429 HandleRemb(rtcp_packet_header, now); 430 break; 431 } 432 } 433 434 void RtcpTransceiverImpl::HandleFir( 435 const rtcp::CommonHeader& rtcp_packet_header) { 436 rtcp::Fir fir; 437 if (local_senders_.empty() || !fir.Parse(rtcp_packet_header)) { 438 return; 439 } 440 for (const rtcp::Fir::Request& r : fir.requests()) { 441 auto it = local_senders_by_ssrc_.find(r.ssrc); 442 if (it == local_senders_by_ssrc_.end()) { 443 continue; 444 } 445 auto [fir_it, is_new] = 446 it->second->last_fir.emplace(fir.sender_ssrc(), r.seq_nr); 447 if (is_new || fir_it->second != r.seq_nr) { 448 it->second->handler->OnFir(fir.sender_ssrc()); 449 fir_it->second = r.seq_nr; 450 } 451 } 452 } 453 454 void RtcpTransceiverImpl::HandlePli( 455 const rtcp::CommonHeader& rtcp_packet_header) { 456 rtcp::Pli pli; 457 if (local_senders_.empty() || !pli.Parse(rtcp_packet_header)) { 458 return; 459 } 460 auto it = local_senders_by_ssrc_.find(pli.media_ssrc()); 461 if (it != local_senders_by_ssrc_.end()) { 462 it->second->handler->OnPli(pli.sender_ssrc()); 463 } 464 } 465 466 void RtcpTransceiverImpl::HandleRemb( 467 const rtcp::CommonHeader& rtcp_packet_header, 468 Timestamp now) { 469 rtcp::Remb remb; 470 if (config_.network_link_observer == nullptr || 471 !remb.Parse(rtcp_packet_header)) { 472 return; 473 } 474 config_.network_link_observer->OnReceiverEstimatedMaxBitrate( 475 now, DataRate::BitsPerSec(remb.bitrate_bps())); 476 } 477 478 void RtcpTransceiverImpl::HandleRtpFeedback( 479 const rtcp::CommonHeader& rtcp_packet_header, 480 Timestamp now) { 481 switch (rtcp_packet_header.fmt()) { 482 case rtcp::Nack::kFeedbackMessageType: 483 HandleNack(rtcp_packet_header); 484 break; 485 case rtcp::TransportFeedback::kFeedbackMessageType: 486 HandleTransportFeedback(rtcp_packet_header, now); 487 break; 488 case rtcp::CongestionControlFeedback::kFeedbackMessageType: 489 HandleCongestionControlFeedback(rtcp_packet_header, now); 490 break; 491 } 492 } 493 494 void RtcpTransceiverImpl::HandleNack( 495 const rtcp::CommonHeader& rtcp_packet_header) { 496 rtcp::Nack nack; 497 if (local_senders_.empty() || !nack.Parse(rtcp_packet_header)) { 498 return; 499 } 500 auto it = local_senders_by_ssrc_.find(nack.media_ssrc()); 501 if (it != local_senders_by_ssrc_.end()) { 502 it->second->handler->OnNack(nack.sender_ssrc(), nack.packet_ids()); 503 } 504 } 505 506 void RtcpTransceiverImpl::HandleTransportFeedback( 507 const rtcp::CommonHeader& rtcp_packet_header, 508 Timestamp now) { 509 RTC_DCHECK_EQ(rtcp_packet_header.fmt(), 510 rtcp::TransportFeedback::kFeedbackMessageType); 511 if (config_.network_link_observer == nullptr) { 512 return; 513 } 514 rtcp::TransportFeedback feedback; 515 if (feedback.Parse(rtcp_packet_header)) { 516 config_.network_link_observer->OnTransportFeedback(now, feedback); 517 } 518 } 519 520 void RtcpTransceiverImpl::HandleCongestionControlFeedback( 521 const rtcp::CommonHeader& rtcp_packet_header, 522 Timestamp now) { 523 RTC_DCHECK_EQ(rtcp_packet_header.fmt(), 524 rtcp::CongestionControlFeedback::kFeedbackMessageType); 525 if (config_.network_link_observer == nullptr) { 526 return; 527 } 528 rtcp::CongestionControlFeedback feedback; 529 if (feedback.Parse(rtcp_packet_header)) { 530 config_.network_link_observer->OnCongestionControlFeedback(now, feedback); 531 } 532 } 533 534 void RtcpTransceiverImpl::HandleExtendedReports( 535 const rtcp::CommonHeader& rtcp_packet_header, 536 Timestamp now) { 537 rtcp::ExtendedReports extended_reports; 538 if (!extended_reports.Parse(rtcp_packet_header)) 539 return; 540 541 if (config_.reply_to_non_sender_rtt_measurement && extended_reports.rrtr()) { 542 RrtrTimes& rrtr = received_rrtrs_[extended_reports.sender_ssrc()]; 543 rrtr.received_remote_mid_ntp_time = 544 CompactNtp(extended_reports.rrtr()->ntp()); 545 rrtr.local_receive_mid_ntp_time = 546 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now)); 547 } 548 549 if (extended_reports.dlrr()) 550 HandleDlrr(extended_reports.dlrr(), now); 551 552 if (extended_reports.target_bitrate()) 553 HandleTargetBitrate(*extended_reports.target_bitrate(), 554 extended_reports.sender_ssrc()); 555 } 556 557 void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, Timestamp now) { 558 if (!config_.non_sender_rtt_measurement || 559 config_.network_link_observer == nullptr) { 560 return; 561 } 562 563 // Delay and last_rr are transferred using 32bit compact ntp resolution. 564 // Convert packet arrival time to same format through 64bit ntp format. 565 uint32_t receive_time_ntp = 566 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now)); 567 for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) { 568 if (rti.ssrc != config_.feedback_ssrc) 569 continue; 570 uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr; 571 TimeDelta rtt = CompactNtpRttToTimeDelta(rtt_ntp); 572 config_.network_link_observer->OnRttUpdate(now, rtt); 573 } 574 } 575 576 void RtcpTransceiverImpl::ProcessReportBlocks( 577 Timestamp now, 578 ArrayView<const ReportBlockData> report_blocks) { 579 RTC_DCHECK(!report_blocks.empty()); 580 if (config_.network_link_observer == nullptr) { 581 return; 582 } 583 // Round trip time calculated from different report blocks suppose to be about 584 // the same, as those blocks should be generated by the same remote sender. 585 // To avoid too many callbacks, this code accumulate multiple rtts into one. 586 TimeDelta rtt_sum = TimeDelta::Zero(); 587 size_t num_rtts = 0; 588 for (const ReportBlockData& report_block : report_blocks) { 589 if (report_block.has_rtt()) { 590 rtt_sum += report_block.last_rtt(); 591 ++num_rtts; 592 } 593 } 594 if (num_rtts > 0) { 595 config_.network_link_observer->OnRttUpdate(now, rtt_sum / num_rtts); 596 } 597 config_.network_link_observer->OnReport(now, report_blocks); 598 } 599 600 void RtcpTransceiverImpl::HandleTargetBitrate( 601 const rtcp::TargetBitrate& target_bitrate, 602 uint32_t remote_ssrc) { 603 auto remote_sender_it = remote_senders_.find(remote_ssrc); 604 if (remote_sender_it == remote_senders_.end() || 605 remote_sender_it->second.observers.empty()) 606 return; 607 608 // Convert rtcp::TargetBitrate to VideoBitrateAllocation. 609 VideoBitrateAllocation bitrate_allocation; 610 for (const rtcp::TargetBitrate::BitrateItem& item : 611 target_bitrate.GetTargetBitrates()) { 612 if (item.spatial_layer >= kMaxSpatialLayers || 613 item.temporal_layer >= kMaxTemporalStreams) { 614 RTC_DLOG(LS_WARNING) 615 << config_.debug_id 616 << "Invalid incoming TargetBitrate with spatial layer " 617 << item.spatial_layer << ", temporal layer " << item.temporal_layer; 618 continue; 619 } 620 bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer, 621 item.target_bitrate_kbps * 1000); 622 } 623 624 for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers) 625 observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation); 626 } 627 628 void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() { 629 if (!config_.schedule_periodic_compound_packets) 630 return; 631 periodic_task_handle_.Stop(); 632 RTC_DCHECK(ready_to_send_); 633 SchedulePeriodicCompoundPackets(config_.report_period); 634 } 635 636 void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(TimeDelta delay) { 637 periodic_task_handle_ = RepeatingTaskHandle::DelayedStart( 638 config_.task_queue, delay, 639 [this] { 640 RTC_DCHECK(config_.schedule_periodic_compound_packets); 641 RTC_DCHECK(ready_to_send_); 642 SendPeriodicCompoundPacket(); 643 return config_.report_period; 644 }, 645 TaskQueueBase::DelayPrecision::kLow, config_.clock); 646 } 647 648 std::vector<uint32_t> RtcpTransceiverImpl::FillReports( 649 Timestamp now, 650 ReservedBytes reserved, 651 PacketSender& rtcp_sender) { 652 // Sender/receiver reports should be first in the RTCP packet. 653 RTC_DCHECK(rtcp_sender.IsEmpty()); 654 655 size_t available_bytes = config_.max_packet_size; 656 if (reserved.per_packet > available_bytes) { 657 // Because reserved.per_packet is unsigned, substracting would underflow and 658 // will not produce desired result. 659 available_bytes = 0; 660 } else { 661 available_bytes -= reserved.per_packet; 662 } 663 664 const size_t sender_report_size_bytes = 28 + reserved.per_sender; 665 const size_t full_sender_report_size_bytes = 666 sender_report_size_bytes + 667 rtcp::SenderReport::kMaxNumberOfReportBlocks * rtcp::ReportBlock::kLength; 668 size_t max_full_sender_reports = 669 available_bytes / full_sender_report_size_bytes; 670 size_t max_report_blocks = 671 max_full_sender_reports * rtcp::SenderReport::kMaxNumberOfReportBlocks; 672 size_t available_bytes_for_last_sender_report = 673 available_bytes - max_full_sender_reports * full_sender_report_size_bytes; 674 if (available_bytes_for_last_sender_report >= sender_report_size_bytes) { 675 max_report_blocks += 676 (available_bytes_for_last_sender_report - sender_report_size_bytes) / 677 rtcp::ReportBlock::kLength; 678 } 679 680 std::vector<rtcp::ReportBlock> report_blocks = 681 CreateReportBlocks(now, max_report_blocks); 682 // Previous calculation of max number of sender report made space for max 683 // number of report blocks per sender report, but if number of report blocks 684 // is low, more sender reports may fit in. 685 size_t max_sender_reports = 686 (available_bytes - report_blocks.size() * rtcp::ReportBlock::kLength) / 687 sender_report_size_bytes; 688 689 auto last_handled_sender_it = local_senders_.end(); 690 auto report_block_it = report_blocks.begin(); 691 std::vector<uint32_t> sender_ssrcs; 692 for (auto it = local_senders_.begin(); 693 it != local_senders_.end() && sender_ssrcs.size() < max_sender_reports; 694 ++it) { 695 LocalSenderState& rtp_sender = *it; 696 RtpStreamRtcpHandler::RtpStats stats = rtp_sender.handler->SentStats(); 697 698 if (stats.num_sent_bytes() < rtp_sender.last_num_sent_bytes) { 699 RTC_LOG(LS_ERROR) << "Inconsistent SR for SSRC " << rtp_sender.ssrc 700 << ". Number of total sent bytes decreased."; 701 rtp_sender.last_num_sent_bytes = 0; 702 } 703 if (stats.num_sent_bytes() == rtp_sender.last_num_sent_bytes) { 704 // Skip because no RTP packet was send for this SSRC since last report. 705 continue; 706 } 707 rtp_sender.last_num_sent_bytes = stats.num_sent_bytes(); 708 709 last_handled_sender_it = it; 710 rtcp::SenderReport sender_report; 711 sender_report.SetSenderSsrc(rtp_sender.ssrc); 712 sender_report.SetPacketCount(stats.num_sent_packets()); 713 sender_report.SetOctetCount(stats.num_sent_bytes()); 714 sender_report.SetNtp(config_.clock->ConvertTimestampToNtpTime(now)); 715 RTC_DCHECK_GE(now, stats.last_capture_time()); 716 sender_report.SetRtpTimestamp( 717 stats.last_rtp_timestamp() + 718 ((now - stats.last_capture_time()) * stats.last_clock_rate()) 719 .seconds()); 720 if (report_block_it != report_blocks.end()) { 721 size_t num_blocks = 722 std::min<size_t>(rtcp::SenderReport::kMaxNumberOfReportBlocks, 723 report_blocks.end() - report_block_it); 724 std::vector<rtcp::ReportBlock> sub_blocks(report_block_it, 725 report_block_it + num_blocks); 726 sender_report.SetReportBlocks(std::move(sub_blocks)); 727 report_block_it += num_blocks; 728 } 729 rtcp_sender.AppendPacket(sender_report); 730 sender_ssrcs.push_back(rtp_sender.ssrc); 731 } 732 if (last_handled_sender_it != local_senders_.end()) { 733 // Rotate `local_senders_` so that the 1st unhandled sender become first in 734 // the list, and thus will be first to generate rtcp sender report for on 735 // the next call to `FillReports`. 736 local_senders_.splice(local_senders_.end(), local_senders_, 737 local_senders_.begin(), 738 std::next(last_handled_sender_it)); 739 } 740 741 // Calculcate number of receiver reports to attach remaining report blocks to. 742 size_t num_receiver_reports = 743 DivideRoundUp(report_blocks.end() - report_block_it, 744 rtcp::ReceiverReport::kMaxNumberOfReportBlocks); 745 746 // In compound mode each RTCP packet has to start with a sender or receiver 747 // report. 748 if (config_.rtcp_mode == RtcpMode::kCompound && sender_ssrcs.empty() && 749 num_receiver_reports == 0) { 750 num_receiver_reports = 1; 751 } 752 753 uint32_t sender_ssrc = 754 sender_ssrcs.empty() ? config_.feedback_ssrc : sender_ssrcs.front(); 755 for (size_t i = 0; i < num_receiver_reports; ++i) { 756 rtcp::ReceiverReport receiver_report; 757 receiver_report.SetSenderSsrc(sender_ssrc); 758 size_t num_blocks = 759 std::min<size_t>(rtcp::ReceiverReport::kMaxNumberOfReportBlocks, 760 report_blocks.end() - report_block_it); 761 std::vector<rtcp::ReportBlock> sub_blocks(report_block_it, 762 report_block_it + num_blocks); 763 receiver_report.SetReportBlocks(std::move(sub_blocks)); 764 report_block_it += num_blocks; 765 rtcp_sender.AppendPacket(receiver_report); 766 } 767 // All report blocks should be attached at this point. 768 RTC_DCHECK_EQ(report_blocks.end() - report_block_it, 0); 769 return sender_ssrcs; 770 } 771 772 void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now, 773 size_t reserved_bytes, 774 PacketSender& sender) { 775 RTC_DCHECK(sender.IsEmpty()); 776 ReservedBytes reserved = {.per_packet = reserved_bytes}; 777 std::optional<rtcp::Sdes> sdes; 778 if (!config_.cname.empty()) { 779 sdes.emplace(); 780 bool added = sdes->AddCName(config_.feedback_ssrc, config_.cname); 781 RTC_DCHECK(added) << "Failed to add CNAME " << config_.cname 782 << " to RTCP SDES packet."; 783 reserved.per_packet += sdes->BlockLength(); 784 } 785 if (remb_.has_value()) { 786 reserved.per_packet += remb_->BlockLength(); 787 } 788 std::optional<rtcp::ExtendedReports> xr_with_dlrr; 789 if (!received_rrtrs_.empty()) { 790 RTC_DCHECK(config_.reply_to_non_sender_rtt_measurement); 791 xr_with_dlrr.emplace(); 792 uint32_t now_ntp = 793 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now)); 794 for (const auto& [ssrc, rrtr_info] : received_rrtrs_) { 795 rtcp::ReceiveTimeInfo reply; 796 reply.ssrc = ssrc; 797 reply.last_rr = rrtr_info.received_remote_mid_ntp_time; 798 reply.delay_since_last_rr = 799 now_ntp - rrtr_info.local_receive_mid_ntp_time; 800 xr_with_dlrr->AddDlrrItem(reply); 801 } 802 if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs) { 803 reserved.per_sender += xr_with_dlrr->BlockLength(); 804 } else { 805 reserved.per_packet += xr_with_dlrr->BlockLength(); 806 } 807 } 808 if (config_.non_sender_rtt_measurement) { 809 // It looks like bytes for ExtendedReport header are reserved twice, but in 810 // practice the same RtcpTransceiver won't both produce RRTR (i.e. it is a 811 // receiver-only) and reply to RRTR (i.e. remote participant is a receiver 812 // only). If that happen, then `reserved_bytes` would be slightly larger 813 // than it should, which is not an issue. 814 815 // 4 bytes for common RTCP header + 4 bytes for the ExtenedReports header. 816 reserved.per_packet += (4 + 4 + rtcp::Rrtr::kLength); 817 } 818 819 std::vector<uint32_t> sender_ssrcs = FillReports(now, reserved, sender); 820 bool has_sender_report = !sender_ssrcs.empty(); 821 uint32_t sender_ssrc = 822 has_sender_report ? sender_ssrcs.front() : config_.feedback_ssrc; 823 824 if (sdes.has_value() && !sender.IsEmpty()) { 825 sender.AppendPacket(*sdes); 826 } 827 if (remb_.has_value()) { 828 remb_->SetSenderSsrc(sender_ssrc); 829 sender.AppendPacket(*remb_); 830 } 831 if (!has_sender_report && config_.non_sender_rtt_measurement) { 832 rtcp::ExtendedReports xr_with_rrtr; 833 xr_with_rrtr.SetSenderSsrc(config_.feedback_ssrc); 834 rtcp::Rrtr rrtr; 835 rrtr.SetNtp(config_.clock->ConvertTimestampToNtpTime(now)); 836 xr_with_rrtr.SetRrtr(rrtr); 837 sender.AppendPacket(xr_with_rrtr); 838 } 839 if (xr_with_dlrr.has_value()) { 840 ArrayView<const uint32_t> ssrcs(&sender_ssrc, 1); 841 if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs && 842 !sender_ssrcs.empty()) { 843 ssrcs = sender_ssrcs; 844 } 845 RTC_DCHECK(!ssrcs.empty()); 846 for (uint32_t ssrc : ssrcs) { 847 xr_with_dlrr->SetSenderSsrc(ssrc); 848 sender.AppendPacket(*xr_with_dlrr); 849 } 850 } 851 } 852 853 void RtcpTransceiverImpl::SendPeriodicCompoundPacket() { 854 Timestamp now = config_.clock->CurrentTime(); 855 PacketSender sender(rtcp_transport_, config_.max_packet_size); 856 CreateCompoundPacket(now, /*reserved_bytes=*/0, sender); 857 sender.Send(); 858 } 859 860 void RtcpTransceiverImpl::SendCombinedRtcpPacket( 861 std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets) { 862 PacketSender sender(rtcp_transport_, config_.max_packet_size); 863 864 for (auto& rtcp_packet : rtcp_packets) { 865 rtcp_packet->SetSenderSsrc(config_.feedback_ssrc); 866 sender.AppendPacket(*rtcp_packet); 867 } 868 sender.Send(); 869 } 870 871 void RtcpTransceiverImpl::SendImmediateFeedback( 872 const rtcp::RtcpPacket& rtcp_packet) { 873 PacketSender sender(rtcp_transport_, config_.max_packet_size); 874 // Compound mode requires every sent rtcp packet to be compound, i.e. start 875 // with a sender or receiver report. 876 if (config_.rtcp_mode == RtcpMode::kCompound) { 877 Timestamp now = config_.clock->CurrentTime(); 878 CreateCompoundPacket(now, /*reserved_bytes=*/rtcp_packet.BlockLength(), 879 sender); 880 } 881 882 sender.AppendPacket(rtcp_packet); 883 sender.Send(); 884 885 // If compound packet was sent, delay (reschedule) the periodic one. 886 if (config_.rtcp_mode == RtcpMode::kCompound) 887 ReschedulePeriodicCompoundPackets(); 888 } 889 890 std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks( 891 Timestamp now, 892 size_t num_max_blocks) { 893 if (!config_.receive_statistics) 894 return {}; 895 std::vector<rtcp::ReportBlock> report_blocks = 896 config_.receive_statistics->RtcpReportBlocks(num_max_blocks); 897 uint32_t last_sr = 0; 898 uint32_t last_delay = 0; 899 for (rtcp::ReportBlock& report_block : report_blocks) { 900 auto it = remote_senders_.find(report_block.source_ssrc()); 901 if (it == remote_senders_.end() || 902 !it->second.last_received_sender_report) { 903 continue; 904 } 905 const SenderReportTimes& last_sender_report = 906 *it->second.last_received_sender_report; 907 last_sr = CompactNtp(last_sender_report.remote_sent_time); 908 last_delay = 909 SaturatedToCompactNtp(now - last_sender_report.local_received_time); 910 report_block.SetLastSr(last_sr); 911 report_block.SetDelayLastSr(last_delay); 912 } 913 return report_blocks; 914 } 915 916 } // namespace webrtc