rtp_sender_egress.cc (19741B)
1 /* 2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/rtp_rtcp/source/rtp_sender_egress.h" 12 13 #include <algorithm> 14 #include <cstddef> 15 #include <cstdint> 16 #include <memory> 17 #include <optional> 18 #include <utility> 19 #include <vector> 20 21 #include "api/array_view.h" 22 #include "api/call/transport.h" 23 #include "api/environment/environment.h" 24 #include "api/field_trials_view.h" 25 #include "api/rtc_event_log/rtc_event_log.h" 26 #include "api/sequence_checker.h" 27 #include "api/task_queue/pending_task_safety_flag.h" 28 #include "api/task_queue/task_queue_base.h" 29 #include "api/transport/network_types.h" 30 #include "api/units/data_rate.h" 31 #include "api/units/time_delta.h" 32 #include "api/units/timestamp.h" 33 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h" 34 #include "modules/include/module_fec_types.h" 35 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 36 #include "modules/rtp_rtcp/source/packet_sequencer.h" 37 #include "modules/rtp_rtcp/source/rtp_header_extensions.h" 38 #include "modules/rtp_rtcp/source/rtp_packet_history.h" 39 #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h" 40 #include "modules/rtp_rtcp/source/rtp_sequence_number_map.h" 41 #include "rtc_base/bitrate_tracker.h" 42 #include "rtc_base/checks.h" 43 #include "rtc_base/copy_on_write_buffer.h" 44 #include "rtc_base/logging.h" 45 #include "rtc_base/task_utils/repeating_task.h" 46 47 namespace webrtc { 48 namespace { 49 constexpr uint32_t kTimestampTicksPerMs = 90; 50 constexpr TimeDelta kBitrateStatisticsWindow = TimeDelta::Seconds(1); 51 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13; 52 constexpr TimeDelta kUpdateInterval = kBitrateStatisticsWindow; 53 } // namespace 54 55 RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender( 56 TaskQueueBase& worker_queue, 57 RtpSenderEgress* sender, 58 PacketSequencer* sequencer) 59 : worker_queue_(worker_queue), 60 transport_sequence_number_(0), 61 sender_(sender), 62 sequencer_(sequencer) { 63 RTC_DCHECK(sequencer); 64 } 65 RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() { 66 RTC_DCHECK_RUN_ON(&worker_queue_); 67 } 68 69 void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( 70 std::vector<std::unique_ptr<RtpPacketToSend>> packets) { 71 if (!worker_queue_.IsCurrent()) { 72 worker_queue_.PostTask(SafeTask( 73 task_safety_.flag(), [this, packets = std::move(packets)]() mutable { 74 EnqueuePackets(std::move(packets)); 75 })); 76 return; 77 } 78 RTC_DCHECK_RUN_ON(&worker_queue_); 79 for (auto& packet : packets) { 80 PrepareForSend(packet.get()); 81 sender_->SendPacket(std::move(packet), PacedPacketInfo()); 82 } 83 auto fec_packets = sender_->FetchFecPackets(); 84 if (!fec_packets.empty()) { 85 EnqueuePackets(std::move(fec_packets)); 86 } 87 } 88 89 void RtpSenderEgress::NonPacedPacketSender::PrepareForSend( 90 RtpPacketToSend* packet) { 91 RTC_DCHECK_RUN_ON(&worker_queue_); 92 // Assign sequence numbers, but not for flexfec which is already running on 93 // an internally maintained sequence number series. 94 if (packet->Ssrc() != sender_->FlexFecSsrc()) { 95 sequencer_->Sequence(*packet); 96 } 97 if (!packet->SetExtension<TransportSequenceNumber>( 98 ++transport_sequence_number_)) { 99 --transport_sequence_number_; 100 } 101 packet->ReserveExtension<TransmissionOffset>(); 102 packet->ReserveExtension<AbsoluteSendTime>(); 103 } 104 105 RtpSenderEgress::RtpSenderEgress(const Environment& env, 106 const RtpRtcpInterface::Configuration& config, 107 RtpPacketHistory* packet_history) 108 : env_(env), 109 enable_send_packet_batching_(config.enable_send_packet_batching), 110 worker_queue_(TaskQueueBase::Current()), 111 ssrc_(config.local_media_ssrc), 112 rtx_ssrc_(config.rtx_send_ssrc), 113 flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc() 114 : std::nullopt), 115 populate_network2_timestamp_(config.populate_network2_timestamp), 116 packet_history_(packet_history), 117 transport_(config.outgoing_transport), 118 is_audio_(config.audio), 119 need_rtp_packet_infos_(config.need_rtp_packet_infos), 120 fec_generator_(config.fec_generator), 121 send_packet_observer_(config.send_packet_observer), 122 rtp_stats_callback_(config.rtp_stats_callback), 123 bitrate_callback_(config.send_bitrate_observer), 124 media_has_been_sent_(false), 125 force_part_of_allocation_(false), 126 timestamp_offset_(0), 127 send_rates_(kNumMediaTypes, BitrateTracker(kBitrateStatisticsWindow)), 128 rtp_sequence_number_map_(need_rtp_packet_infos_ 129 ? std::make_unique<RtpSequenceNumberMap>( 130 kRtpSequenceNumberMapMaxEntries) 131 : nullptr), 132 use_ntp_time_for_absolute_send_time_(!env_.field_trials().IsDisabled( 133 "WebRTC-UseNtpTimeAbsoluteSendTime")) { 134 RTC_DCHECK(worker_queue_); 135 if (bitrate_callback_) { 136 update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_, 137 kUpdateInterval, [this]() { 138 PeriodicUpdate(); 139 return kUpdateInterval; 140 }); 141 } 142 } 143 144 RtpSenderEgress::~RtpSenderEgress() { 145 RTC_DCHECK_RUN_ON(worker_queue_); 146 update_task_.Stop(); 147 } 148 149 void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet, 150 const PacedPacketInfo& pacing_info) { 151 RTC_DCHECK_RUN_ON(worker_queue_); 152 RTC_DCHECK(packet); 153 154 if (packet->Ssrc() == ssrc_ && 155 packet->packet_type() != RtpPacketMediaType::kRetransmission) { 156 if (last_sent_seq_.has_value()) { 157 RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_seq_ + 1), 158 packet->SequenceNumber()); 159 } 160 last_sent_seq_ = packet->SequenceNumber(); 161 } else if (packet->Ssrc() == rtx_ssrc_) { 162 if (last_sent_rtx_seq_.has_value()) { 163 RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_rtx_seq_ + 1), 164 packet->SequenceNumber()); 165 } 166 last_sent_rtx_seq_ = packet->SequenceNumber(); 167 } 168 169 RTC_DCHECK(packet->packet_type().has_value()); 170 RTC_DCHECK(HasCorrectSsrc(*packet)); 171 if (packet->packet_type() == RtpPacketMediaType::kRetransmission) { 172 RTC_DCHECK(packet->retransmitted_sequence_number().has_value()); 173 } 174 175 const Timestamp now = env_.clock().CurrentTime(); 176 if (need_rtp_packet_infos_ && 177 packet->packet_type() == RtpPacketToSend::Type::kVideo) { 178 // Last packet of a frame, add it to sequence number info map. 179 const uint32_t timestamp = packet->Timestamp() - timestamp_offset_; 180 rtp_sequence_number_map_->InsertPacket( 181 packet->SequenceNumber(), 182 RtpSequenceNumberMap::Info( 183 timestamp, packet->is_first_packet_of_frame(), packet->Marker())); 184 } 185 186 if (fec_generator_ && packet->fec_protect_packet()) { 187 // This packet should be protected by FEC, add it to packet generator. 188 RTC_DCHECK(fec_generator_); 189 RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo); 190 std::optional<std::pair<FecProtectionParams, FecProtectionParams>> 191 new_fec_params; 192 new_fec_params.swap(pending_fec_params_); 193 if (new_fec_params) { 194 fec_generator_->SetProtectionParameters(new_fec_params->first, 195 new_fec_params->second); 196 } 197 if (packet->is_red()) { 198 RtpPacketToSend unpacked_packet(*packet); 199 200 const CopyOnWriteBuffer buffer = packet->Buffer(); 201 // Grab media payload type from RED header. 202 const size_t headers_size = packet->headers_size(); 203 unpacked_packet.SetPayloadType(buffer[headers_size]); 204 205 // Copy the media payload into the unpacked buffer. 206 uint8_t* payload_buffer = 207 unpacked_packet.SetPayloadSize(packet->payload_size() - 1); 208 std::copy(&packet->payload()[0] + 1, 209 &packet->payload()[0] + packet->payload_size(), payload_buffer); 210 211 fec_generator_->AddPacketAndGenerateFec(unpacked_packet); 212 } else { 213 // If not RED encapsulated - we can just insert packet directly. 214 fec_generator_->AddPacketAndGenerateFec(*packet); 215 } 216 } 217 218 // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after 219 // the pacer, these modifications of the header below are happening after the 220 // FEC protection packets are calculated. This will corrupt recovered packets 221 // at the same place. It's not an issue for extensions, which are present in 222 // all the packets (their content just may be incorrect on recovered packets). 223 // In case of VideoTimingExtension, since it's present not in every packet, 224 // data after rtp header may be corrupted if these packets are protected by 225 // the FEC. 226 if (packet->HasExtension<TransmissionOffset>() && 227 packet->capture_time() > Timestamp::Zero()) { 228 TimeDelta diff = now - packet->capture_time(); 229 packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff.ms()); 230 } 231 if (packet->HasExtension<AbsoluteSendTime>()) { 232 if (use_ntp_time_for_absolute_send_time_) { 233 packet->SetExtension<AbsoluteSendTime>(AbsoluteSendTime::To24Bits( 234 env_.clock().ConvertTimestampToNtpTime(now))); 235 } else { 236 packet->SetExtension<AbsoluteSendTime>(AbsoluteSendTime::To24Bits(now)); 237 } 238 } 239 if (packet->HasExtension<TransportSequenceNumber>() && 240 packet->transport_sequence_number()) { 241 packet->SetExtension<TransportSequenceNumber>( 242 *packet->transport_sequence_number() & 0xFFFF); 243 } 244 245 if (packet->HasExtension<VideoTimingExtension>()) { 246 if (populate_network2_timestamp_) { 247 packet->set_network2_time(now); 248 } else { 249 packet->set_pacer_exit_time(now); 250 } 251 } 252 253 auto compound_packet = 254 Packet{.rtp_packet = std::move(packet), .info = pacing_info, .now = now}; 255 if (enable_send_packet_batching_ && !is_audio_) { 256 packets_to_send_.push_back(std::move(compound_packet)); 257 } else { 258 CompleteSendPacket(compound_packet, false); 259 } 260 } 261 262 void RtpSenderEgress::OnBatchComplete() { 263 RTC_DCHECK_RUN_ON(worker_queue_); 264 for (auto& packet : packets_to_send_) { 265 CompleteSendPacket(packet, &packet == &packets_to_send_.back()); 266 } 267 packets_to_send_.clear(); 268 } 269 270 void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, 271 bool last_in_batch) { 272 RTC_DCHECK_RUN_ON(worker_queue_); 273 auto& [packet, pacing_info, now] = compound_packet; 274 RTC_CHECK(packet); 275 276 PacketOptions options; 277 options.included_in_allocation = force_part_of_allocation_; 278 options.is_media = packet->packet_type() == RtpPacketMediaType::kAudio || 279 packet->packet_type() == RtpPacketMediaType::kVideo; 280 281 // Set Packet id from transport sequence number header extension if it is 282 // used. The source of the header extension is 283 // RtpPacketToSend::transport_sequence_number(), but the extension is only 16 284 // bit and will wrap. We should be able to use the 64bit value as id, but in 285 // order to not change behaviour we use the 16bit extension value if it is 286 // used. 287 std::optional<uint16_t> packet_id = 288 packet->GetExtension<TransportSequenceNumber>(); 289 if (packet_id.has_value()) { 290 options.packet_id = *packet_id; 291 options.included_in_feedback = true; 292 options.included_in_allocation = true; 293 } else if (packet->transport_sequence_number()) { 294 options.packet_id = *packet->transport_sequence_number(); 295 } 296 297 if (packet->packet_type() != RtpPacketMediaType::kPadding && 298 packet->packet_type() != RtpPacketMediaType::kRetransmission && 299 send_packet_observer_ != nullptr && packet->capture_time().IsFinite()) { 300 send_packet_observer_->OnSendPacket(packet_id, packet->capture_time(), 301 packet->Ssrc()); 302 } 303 options.send_as_ect1 = packet->send_as_ect1(); 304 options.batchable = enable_send_packet_batching_ && !is_audio_; 305 options.last_packet_in_batch = last_in_batch; 306 const bool send_success = SendPacketToNetwork(*packet, options, pacing_info); 307 308 // Put packet in retransmission history or update pending status even if 309 // actual sending fails. 310 if (options.is_media && packet->allow_retransmission()) { 311 packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet), 312 now); 313 } else if (packet->retransmitted_sequence_number()) { 314 packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number()); 315 } 316 317 if (send_success) { 318 // `media_has_been_sent_` is used by RTPSender to figure out if it can send 319 // padding in the absence of transport-cc or abs-send-time. 320 // In those cases media must be sent first to set a reference timestamp. 321 media_has_been_sent_ = true; 322 323 // TODO(sprang): Add support for FEC protecting all header extensions, add 324 // media packet to generator here instead. 325 326 RTC_DCHECK(packet->packet_type().has_value()); 327 RtpPacketMediaType packet_type = *packet->packet_type(); 328 RtpPacketCounter counter(*packet); 329 UpdateRtpStats(now, packet->Ssrc(), packet_type, std::move(counter), 330 packet->size()); 331 } 332 } 333 334 RtpSendRates RtpSenderEgress::GetSendRates(Timestamp now) const { 335 RTC_DCHECK_RUN_ON(worker_queue_); 336 RtpSendRates current_rates; 337 for (size_t i = 0; i < kNumMediaTypes; ++i) { 338 RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i); 339 current_rates[type] = send_rates_[i].Rate(now).value_or(DataRate::Zero()); 340 } 341 return current_rates; 342 } 343 344 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats, 345 StreamDataCounters* rtx_stats) const { 346 RTC_DCHECK_RUN_ON(worker_queue_); 347 if (rtp_stats_callback_) { 348 *rtp_stats = rtp_stats_callback_->GetDataCounters(ssrc_); 349 if (rtx_ssrc_.has_value()) { 350 *rtx_stats = rtp_stats_callback_->GetDataCounters(*rtx_ssrc_); 351 } 352 } else { 353 *rtp_stats = rtp_stats_; 354 *rtx_stats = rtx_rtp_stats_; 355 } 356 } 357 358 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation( 359 bool part_of_allocation) { 360 RTC_DCHECK_RUN_ON(worker_queue_); 361 force_part_of_allocation_ = part_of_allocation; 362 } 363 364 bool RtpSenderEgress::MediaHasBeenSent() const { 365 RTC_DCHECK_RUN_ON(worker_queue_); 366 return media_has_been_sent_; 367 } 368 369 void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) { 370 RTC_DCHECK_RUN_ON(worker_queue_); 371 media_has_been_sent_ = media_sent; 372 } 373 374 void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) { 375 RTC_DCHECK_RUN_ON(worker_queue_); 376 timestamp_offset_ = timestamp; 377 } 378 379 std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos( 380 ArrayView<const uint16_t> sequence_numbers) const { 381 RTC_DCHECK_RUN_ON(worker_queue_); 382 RTC_DCHECK(!sequence_numbers.empty()); 383 if (!need_rtp_packet_infos_) { 384 return std::vector<RtpSequenceNumberMap::Info>(); 385 } 386 387 std::vector<RtpSequenceNumberMap::Info> results; 388 results.reserve(sequence_numbers.size()); 389 390 for (uint16_t sequence_number : sequence_numbers) { 391 const auto& info = rtp_sequence_number_map_->Get(sequence_number); 392 if (!info) { 393 // The empty vector will be returned. We can delay the clearing 394 // of the vector until after we exit the critical section. 395 return std::vector<RtpSequenceNumberMap::Info>(); 396 } 397 results.push_back(*info); 398 } 399 400 return results; 401 } 402 403 void RtpSenderEgress::SetFecProtectionParameters( 404 const FecProtectionParams& delta_params, 405 const FecProtectionParams& key_params) { 406 RTC_DCHECK_RUN_ON(worker_queue_); 407 pending_fec_params_.emplace(delta_params, key_params); 408 } 409 410 std::vector<std::unique_ptr<RtpPacketToSend>> 411 RtpSenderEgress::FetchFecPackets() { 412 RTC_DCHECK_RUN_ON(worker_queue_); 413 if (fec_generator_) { 414 return fec_generator_->GetFecPackets(); 415 } 416 return {}; 417 } 418 419 void RtpSenderEgress::OnAbortedRetransmissions( 420 ArrayView<const uint16_t> sequence_numbers) { 421 RTC_DCHECK_RUN_ON(worker_queue_); 422 // Mark aborted retransmissions as sent, rather than leaving them in 423 // a 'pending' state - otherwise they can not be requested again and 424 // will not be cleared until the history has reached its max size. 425 for (uint16_t seq_no : sequence_numbers) { 426 packet_history_->MarkPacketAsSent(seq_no); 427 } 428 } 429 430 bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const { 431 switch (*packet.packet_type()) { 432 case RtpPacketMediaType::kAudio: 433 case RtpPacketMediaType::kVideo: 434 return packet.Ssrc() == ssrc_; 435 case RtpPacketMediaType::kRetransmission: 436 case RtpPacketMediaType::kPadding: 437 // Both padding and retransmission must be on either the media or the 438 // RTX stream. 439 return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_; 440 case RtpPacketMediaType::kForwardErrorCorrection: 441 // FlexFEC is on separate SSRC, ULPFEC uses media SSRC. 442 return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_; 443 } 444 return false; 445 } 446 447 bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, 448 const PacketOptions& options, 449 const PacedPacketInfo& pacing_info) { 450 RTC_DCHECK_RUN_ON(worker_queue_); 451 if (transport_ == nullptr || !transport_->SendRtp(packet, options)) { 452 RTC_LOG(LS_WARNING) << "Transport failed to send packet."; 453 return false; 454 } 455 456 env_.event_log().Log(std::make_unique<RtcEventRtpPacketOutgoing>( 457 packet, pacing_info.probe_cluster_id)); 458 return true; 459 } 460 461 void RtpSenderEgress::UpdateRtpStats(Timestamp now, 462 uint32_t packet_ssrc, 463 RtpPacketMediaType packet_type, 464 RtpPacketCounter counter, 465 size_t packet_size) { 466 RTC_DCHECK_RUN_ON(worker_queue_); 467 468 // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the 469 // worker thread. 470 RtpSendRates send_rates; 471 472 StreamDataCounters* counters = nullptr; 473 if (rtp_stats_callback_) { 474 rtp_stats_ = rtp_stats_callback_->GetDataCounters(packet_ssrc); 475 counters = &rtp_stats_; 476 } else { 477 counters = packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; 478 } 479 480 counters->MaybeSetFirstPacketTime(now); 481 482 if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { 483 counters->fec.Add(counter); 484 } else if (packet_type == RtpPacketMediaType::kRetransmission) { 485 counters->retransmitted.Add(counter); 486 } 487 counters->transmitted.Add(counter); 488 489 send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now); 490 if (bitrate_callback_) { 491 send_rates = GetSendRates(now); 492 } 493 494 if (rtp_stats_callback_) { 495 rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc); 496 } 497 498 // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point 499 // to the same object, so these callbacks could be consolidated into one. 500 if (bitrate_callback_) { 501 bitrate_callback_->Notify( 502 send_rates.Sum().bps(), 503 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); 504 } 505 } 506 507 void RtpSenderEgress::PeriodicUpdate() { 508 RTC_DCHECK_RUN_ON(worker_queue_); 509 RTC_DCHECK(bitrate_callback_); 510 RtpSendRates send_rates = GetSendRates(env_.clock().CurrentTime()); 511 bitrate_callback_->Notify( 512 send_rates.Sum().bps(), 513 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); 514 } 515 } // namespace webrtc