tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

rtp_sender_egress.cc (19741B)


      1 /*
      2 *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
      3 *
      4 *  Use of this source code is governed by a BSD-style license
      5 *  that can be found in the LICENSE file in the root of the source
      6 *  tree. An additional intellectual property rights grant can be found
      7 *  in the file PATENTS.  All contributing project authors may
      8 *  be found in the AUTHORS file in the root of the source tree.
      9 */
     10 
     11 #include "modules/rtp_rtcp/source/rtp_sender_egress.h"
     12 
     13 #include <algorithm>
     14 #include <cstddef>
     15 #include <cstdint>
     16 #include <memory>
     17 #include <optional>
     18 #include <utility>
     19 #include <vector>
     20 
     21 #include "api/array_view.h"
     22 #include "api/call/transport.h"
     23 #include "api/environment/environment.h"
     24 #include "api/field_trials_view.h"
     25 #include "api/rtc_event_log/rtc_event_log.h"
     26 #include "api/sequence_checker.h"
     27 #include "api/task_queue/pending_task_safety_flag.h"
     28 #include "api/task_queue/task_queue_base.h"
     29 #include "api/transport/network_types.h"
     30 #include "api/units/data_rate.h"
     31 #include "api/units/time_delta.h"
     32 #include "api/units/timestamp.h"
     33 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
     34 #include "modules/include/module_fec_types.h"
     35 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
     36 #include "modules/rtp_rtcp/source/packet_sequencer.h"
     37 #include "modules/rtp_rtcp/source/rtp_header_extensions.h"
     38 #include "modules/rtp_rtcp/source/rtp_packet_history.h"
     39 #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h"
     40 #include "modules/rtp_rtcp/source/rtp_sequence_number_map.h"
     41 #include "rtc_base/bitrate_tracker.h"
     42 #include "rtc_base/checks.h"
     43 #include "rtc_base/copy_on_write_buffer.h"
     44 #include "rtc_base/logging.h"
     45 #include "rtc_base/task_utils/repeating_task.h"
     46 
     47 namespace webrtc {
     48 namespace {
     49 constexpr uint32_t kTimestampTicksPerMs = 90;
     50 constexpr TimeDelta kBitrateStatisticsWindow = TimeDelta::Seconds(1);
     51 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
     52 constexpr TimeDelta kUpdateInterval = kBitrateStatisticsWindow;
     53 }  // namespace
     54 
     55 RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
     56    TaskQueueBase& worker_queue,
     57    RtpSenderEgress* sender,
     58    PacketSequencer* sequencer)
     59    : worker_queue_(worker_queue),
     60      transport_sequence_number_(0),
     61      sender_(sender),
     62      sequencer_(sequencer) {
     63  RTC_DCHECK(sequencer);
     64 }
     65 RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() {
     66  RTC_DCHECK_RUN_ON(&worker_queue_);
     67 }
     68 
     69 void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
     70    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
     71  if (!worker_queue_.IsCurrent()) {
     72    worker_queue_.PostTask(SafeTask(
     73        task_safety_.flag(), [this, packets = std::move(packets)]() mutable {
     74          EnqueuePackets(std::move(packets));
     75        }));
     76    return;
     77  }
     78  RTC_DCHECK_RUN_ON(&worker_queue_);
     79  for (auto& packet : packets) {
     80    PrepareForSend(packet.get());
     81    sender_->SendPacket(std::move(packet), PacedPacketInfo());
     82  }
     83  auto fec_packets = sender_->FetchFecPackets();
     84  if (!fec_packets.empty()) {
     85    EnqueuePackets(std::move(fec_packets));
     86  }
     87 }
     88 
     89 void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
     90    RtpPacketToSend* packet) {
     91  RTC_DCHECK_RUN_ON(&worker_queue_);
     92  // Assign sequence numbers, but not for flexfec which is already running on
     93  // an internally maintained sequence number series.
     94  if (packet->Ssrc() != sender_->FlexFecSsrc()) {
     95    sequencer_->Sequence(*packet);
     96  }
     97  if (!packet->SetExtension<TransportSequenceNumber>(
     98          ++transport_sequence_number_)) {
     99    --transport_sequence_number_;
    100  }
    101  packet->ReserveExtension<TransmissionOffset>();
    102  packet->ReserveExtension<AbsoluteSendTime>();
    103 }
    104 
    105 RtpSenderEgress::RtpSenderEgress(const Environment& env,
    106                                 const RtpRtcpInterface::Configuration& config,
    107                                 RtpPacketHistory* packet_history)
    108    : env_(env),
    109      enable_send_packet_batching_(config.enable_send_packet_batching),
    110      worker_queue_(TaskQueueBase::Current()),
    111      ssrc_(config.local_media_ssrc),
    112      rtx_ssrc_(config.rtx_send_ssrc),
    113      flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
    114                                         : std::nullopt),
    115      populate_network2_timestamp_(config.populate_network2_timestamp),
    116      packet_history_(packet_history),
    117      transport_(config.outgoing_transport),
    118      is_audio_(config.audio),
    119      need_rtp_packet_infos_(config.need_rtp_packet_infos),
    120      fec_generator_(config.fec_generator),
    121      send_packet_observer_(config.send_packet_observer),
    122      rtp_stats_callback_(config.rtp_stats_callback),
    123      bitrate_callback_(config.send_bitrate_observer),
    124      media_has_been_sent_(false),
    125      force_part_of_allocation_(false),
    126      timestamp_offset_(0),
    127      send_rates_(kNumMediaTypes, BitrateTracker(kBitrateStatisticsWindow)),
    128      rtp_sequence_number_map_(need_rtp_packet_infos_
    129                                   ? std::make_unique<RtpSequenceNumberMap>(
    130                                         kRtpSequenceNumberMapMaxEntries)
    131                                   : nullptr),
    132      use_ntp_time_for_absolute_send_time_(!env_.field_trials().IsDisabled(
    133          "WebRTC-UseNtpTimeAbsoluteSendTime")) {
    134  RTC_DCHECK(worker_queue_);
    135  if (bitrate_callback_) {
    136    update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
    137                                                     kUpdateInterval, [this]() {
    138                                                       PeriodicUpdate();
    139                                                       return kUpdateInterval;
    140                                                     });
    141  }
    142 }
    143 
    144 RtpSenderEgress::~RtpSenderEgress() {
    145  RTC_DCHECK_RUN_ON(worker_queue_);
    146  update_task_.Stop();
    147 }
    148 
    149 void RtpSenderEgress::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
    150                                 const PacedPacketInfo& pacing_info) {
    151  RTC_DCHECK_RUN_ON(worker_queue_);
    152  RTC_DCHECK(packet);
    153 
    154  if (packet->Ssrc() == ssrc_ &&
    155      packet->packet_type() != RtpPacketMediaType::kRetransmission) {
    156    if (last_sent_seq_.has_value()) {
    157      RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_seq_ + 1),
    158                    packet->SequenceNumber());
    159    }
    160    last_sent_seq_ = packet->SequenceNumber();
    161  } else if (packet->Ssrc() == rtx_ssrc_) {
    162    if (last_sent_rtx_seq_.has_value()) {
    163      RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_rtx_seq_ + 1),
    164                    packet->SequenceNumber());
    165    }
    166    last_sent_rtx_seq_ = packet->SequenceNumber();
    167  }
    168 
    169  RTC_DCHECK(packet->packet_type().has_value());
    170  RTC_DCHECK(HasCorrectSsrc(*packet));
    171  if (packet->packet_type() == RtpPacketMediaType::kRetransmission) {
    172    RTC_DCHECK(packet->retransmitted_sequence_number().has_value());
    173  }
    174 
    175  const Timestamp now = env_.clock().CurrentTime();
    176  if (need_rtp_packet_infos_ &&
    177      packet->packet_type() == RtpPacketToSend::Type::kVideo) {
    178    // Last packet of a frame, add it to sequence number info map.
    179    const uint32_t timestamp = packet->Timestamp() - timestamp_offset_;
    180    rtp_sequence_number_map_->InsertPacket(
    181        packet->SequenceNumber(),
    182        RtpSequenceNumberMap::Info(
    183            timestamp, packet->is_first_packet_of_frame(), packet->Marker()));
    184  }
    185 
    186  if (fec_generator_ && packet->fec_protect_packet()) {
    187    // This packet should be protected by FEC, add it to packet generator.
    188    RTC_DCHECK(fec_generator_);
    189    RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
    190    std::optional<std::pair<FecProtectionParams, FecProtectionParams>>
    191        new_fec_params;
    192    new_fec_params.swap(pending_fec_params_);
    193    if (new_fec_params) {
    194      fec_generator_->SetProtectionParameters(new_fec_params->first,
    195                                              new_fec_params->second);
    196    }
    197    if (packet->is_red()) {
    198      RtpPacketToSend unpacked_packet(*packet);
    199 
    200      const CopyOnWriteBuffer buffer = packet->Buffer();
    201      // Grab media payload type from RED header.
    202      const size_t headers_size = packet->headers_size();
    203      unpacked_packet.SetPayloadType(buffer[headers_size]);
    204 
    205      // Copy the media payload into the unpacked buffer.
    206      uint8_t* payload_buffer =
    207          unpacked_packet.SetPayloadSize(packet->payload_size() - 1);
    208      std::copy(&packet->payload()[0] + 1,
    209                &packet->payload()[0] + packet->payload_size(), payload_buffer);
    210 
    211      fec_generator_->AddPacketAndGenerateFec(unpacked_packet);
    212    } else {
    213      // If not RED encapsulated - we can just insert packet directly.
    214      fec_generator_->AddPacketAndGenerateFec(*packet);
    215    }
    216  }
    217 
    218  // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
    219  // the pacer, these modifications of the header below are happening after the
    220  // FEC protection packets are calculated. This will corrupt recovered packets
    221  // at the same place. It's not an issue for extensions, which are present in
    222  // all the packets (their content just may be incorrect on recovered packets).
    223  // In case of VideoTimingExtension, since it's present not in every packet,
    224  // data after rtp header may be corrupted if these packets are protected by
    225  // the FEC.
    226  if (packet->HasExtension<TransmissionOffset>() &&
    227      packet->capture_time() > Timestamp::Zero()) {
    228    TimeDelta diff = now - packet->capture_time();
    229    packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff.ms());
    230  }
    231  if (packet->HasExtension<AbsoluteSendTime>()) {
    232    if (use_ntp_time_for_absolute_send_time_) {
    233      packet->SetExtension<AbsoluteSendTime>(AbsoluteSendTime::To24Bits(
    234          env_.clock().ConvertTimestampToNtpTime(now)));
    235    } else {
    236      packet->SetExtension<AbsoluteSendTime>(AbsoluteSendTime::To24Bits(now));
    237    }
    238  }
    239  if (packet->HasExtension<TransportSequenceNumber>() &&
    240      packet->transport_sequence_number()) {
    241    packet->SetExtension<TransportSequenceNumber>(
    242        *packet->transport_sequence_number() & 0xFFFF);
    243  }
    244 
    245  if (packet->HasExtension<VideoTimingExtension>()) {
    246    if (populate_network2_timestamp_) {
    247      packet->set_network2_time(now);
    248    } else {
    249      packet->set_pacer_exit_time(now);
    250    }
    251  }
    252 
    253  auto compound_packet =
    254      Packet{.rtp_packet = std::move(packet), .info = pacing_info, .now = now};
    255  if (enable_send_packet_batching_ && !is_audio_) {
    256    packets_to_send_.push_back(std::move(compound_packet));
    257  } else {
    258    CompleteSendPacket(compound_packet, false);
    259  }
    260 }
    261 
    262 void RtpSenderEgress::OnBatchComplete() {
    263  RTC_DCHECK_RUN_ON(worker_queue_);
    264  for (auto& packet : packets_to_send_) {
    265    CompleteSendPacket(packet, &packet == &packets_to_send_.back());
    266  }
    267  packets_to_send_.clear();
    268 }
    269 
    270 void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
    271                                         bool last_in_batch) {
    272  RTC_DCHECK_RUN_ON(worker_queue_);
    273  auto& [packet, pacing_info, now] = compound_packet;
    274  RTC_CHECK(packet);
    275 
    276  PacketOptions options;
    277  options.included_in_allocation = force_part_of_allocation_;
    278  options.is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
    279                     packet->packet_type() == RtpPacketMediaType::kVideo;
    280 
    281  // Set Packet id from transport sequence number header extension if it is
    282  // used. The source of the header extension is
    283  // RtpPacketToSend::transport_sequence_number(), but the extension is only 16
    284  // bit and will wrap. We should be able to use the 64bit value as id, but in
    285  // order to not change behaviour we use the 16bit extension value if it is
    286  // used.
    287  std::optional<uint16_t> packet_id =
    288      packet->GetExtension<TransportSequenceNumber>();
    289  if (packet_id.has_value()) {
    290    options.packet_id = *packet_id;
    291    options.included_in_feedback = true;
    292    options.included_in_allocation = true;
    293  } else if (packet->transport_sequence_number()) {
    294    options.packet_id = *packet->transport_sequence_number();
    295  }
    296 
    297  if (packet->packet_type() != RtpPacketMediaType::kPadding &&
    298      packet->packet_type() != RtpPacketMediaType::kRetransmission &&
    299      send_packet_observer_ != nullptr && packet->capture_time().IsFinite()) {
    300    send_packet_observer_->OnSendPacket(packet_id, packet->capture_time(),
    301                                        packet->Ssrc());
    302  }
    303  options.send_as_ect1 = packet->send_as_ect1();
    304  options.batchable = enable_send_packet_batching_ && !is_audio_;
    305  options.last_packet_in_batch = last_in_batch;
    306  const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
    307 
    308  // Put packet in retransmission history or update pending status even if
    309  // actual sending fails.
    310  if (options.is_media && packet->allow_retransmission()) {
    311    packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
    312                                  now);
    313  } else if (packet->retransmitted_sequence_number()) {
    314    packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
    315  }
    316 
    317  if (send_success) {
    318    // `media_has_been_sent_` is used by RTPSender to figure out if it can send
    319    // padding in the absence of transport-cc or abs-send-time.
    320    // In those cases media must be sent first to set a reference timestamp.
    321    media_has_been_sent_ = true;
    322 
    323    // TODO(sprang): Add support for FEC protecting all header extensions, add
    324    // media packet to generator here instead.
    325 
    326    RTC_DCHECK(packet->packet_type().has_value());
    327    RtpPacketMediaType packet_type = *packet->packet_type();
    328    RtpPacketCounter counter(*packet);
    329    UpdateRtpStats(now, packet->Ssrc(), packet_type, std::move(counter),
    330                   packet->size());
    331  }
    332 }
    333 
    334 RtpSendRates RtpSenderEgress::GetSendRates(Timestamp now) const {
    335  RTC_DCHECK_RUN_ON(worker_queue_);
    336  RtpSendRates current_rates;
    337  for (size_t i = 0; i < kNumMediaTypes; ++i) {
    338    RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
    339    current_rates[type] = send_rates_[i].Rate(now).value_or(DataRate::Zero());
    340  }
    341  return current_rates;
    342 }
    343 
    344 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
    345                                      StreamDataCounters* rtx_stats) const {
    346  RTC_DCHECK_RUN_ON(worker_queue_);
    347  if (rtp_stats_callback_) {
    348    *rtp_stats = rtp_stats_callback_->GetDataCounters(ssrc_);
    349    if (rtx_ssrc_.has_value()) {
    350      *rtx_stats = rtp_stats_callback_->GetDataCounters(*rtx_ssrc_);
    351    }
    352  } else {
    353    *rtp_stats = rtp_stats_;
    354    *rtx_stats = rtx_rtp_stats_;
    355  }
    356 }
    357 
    358 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
    359    bool part_of_allocation) {
    360  RTC_DCHECK_RUN_ON(worker_queue_);
    361  force_part_of_allocation_ = part_of_allocation;
    362 }
    363 
    364 bool RtpSenderEgress::MediaHasBeenSent() const {
    365  RTC_DCHECK_RUN_ON(worker_queue_);
    366  return media_has_been_sent_;
    367 }
    368 
    369 void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
    370  RTC_DCHECK_RUN_ON(worker_queue_);
    371  media_has_been_sent_ = media_sent;
    372 }
    373 
    374 void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
    375  RTC_DCHECK_RUN_ON(worker_queue_);
    376  timestamp_offset_ = timestamp;
    377 }
    378 
    379 std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
    380    ArrayView<const uint16_t> sequence_numbers) const {
    381  RTC_DCHECK_RUN_ON(worker_queue_);
    382  RTC_DCHECK(!sequence_numbers.empty());
    383  if (!need_rtp_packet_infos_) {
    384    return std::vector<RtpSequenceNumberMap::Info>();
    385  }
    386 
    387  std::vector<RtpSequenceNumberMap::Info> results;
    388  results.reserve(sequence_numbers.size());
    389 
    390  for (uint16_t sequence_number : sequence_numbers) {
    391    const auto& info = rtp_sequence_number_map_->Get(sequence_number);
    392    if (!info) {
    393      // The empty vector will be returned. We can delay the clearing
    394      // of the vector until after we exit the critical section.
    395      return std::vector<RtpSequenceNumberMap::Info>();
    396    }
    397    results.push_back(*info);
    398  }
    399 
    400  return results;
    401 }
    402 
    403 void RtpSenderEgress::SetFecProtectionParameters(
    404    const FecProtectionParams& delta_params,
    405    const FecProtectionParams& key_params) {
    406  RTC_DCHECK_RUN_ON(worker_queue_);
    407  pending_fec_params_.emplace(delta_params, key_params);
    408 }
    409 
    410 std::vector<std::unique_ptr<RtpPacketToSend>>
    411 RtpSenderEgress::FetchFecPackets() {
    412  RTC_DCHECK_RUN_ON(worker_queue_);
    413  if (fec_generator_) {
    414    return fec_generator_->GetFecPackets();
    415  }
    416  return {};
    417 }
    418 
    419 void RtpSenderEgress::OnAbortedRetransmissions(
    420    ArrayView<const uint16_t> sequence_numbers) {
    421  RTC_DCHECK_RUN_ON(worker_queue_);
    422  // Mark aborted retransmissions as sent, rather than leaving them in
    423  // a 'pending' state - otherwise they can not be requested again and
    424  // will not be cleared until the history has reached its max size.
    425  for (uint16_t seq_no : sequence_numbers) {
    426    packet_history_->MarkPacketAsSent(seq_no);
    427  }
    428 }
    429 
    430 bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const {
    431  switch (*packet.packet_type()) {
    432    case RtpPacketMediaType::kAudio:
    433    case RtpPacketMediaType::kVideo:
    434      return packet.Ssrc() == ssrc_;
    435    case RtpPacketMediaType::kRetransmission:
    436    case RtpPacketMediaType::kPadding:
    437      // Both padding and retransmission must be on either the media or the
    438      // RTX stream.
    439      return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
    440    case RtpPacketMediaType::kForwardErrorCorrection:
    441      // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
    442      return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
    443  }
    444  return false;
    445 }
    446 
    447 bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
    448                                          const PacketOptions& options,
    449                                          const PacedPacketInfo& pacing_info) {
    450  RTC_DCHECK_RUN_ON(worker_queue_);
    451  if (transport_ == nullptr || !transport_->SendRtp(packet, options)) {
    452    RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
    453    return false;
    454  }
    455 
    456  env_.event_log().Log(std::make_unique<RtcEventRtpPacketOutgoing>(
    457      packet, pacing_info.probe_cluster_id));
    458  return true;
    459 }
    460 
    461 void RtpSenderEgress::UpdateRtpStats(Timestamp now,
    462                                     uint32_t packet_ssrc,
    463                                     RtpPacketMediaType packet_type,
    464                                     RtpPacketCounter counter,
    465                                     size_t packet_size) {
    466  RTC_DCHECK_RUN_ON(worker_queue_);
    467 
    468  // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
    469  // worker thread.
    470  RtpSendRates send_rates;
    471 
    472  StreamDataCounters* counters = nullptr;
    473  if (rtp_stats_callback_) {
    474    rtp_stats_ = rtp_stats_callback_->GetDataCounters(packet_ssrc);
    475    counters = &rtp_stats_;
    476  } else {
    477    counters = packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
    478  }
    479 
    480  counters->MaybeSetFirstPacketTime(now);
    481 
    482  if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
    483    counters->fec.Add(counter);
    484  } else if (packet_type == RtpPacketMediaType::kRetransmission) {
    485    counters->retransmitted.Add(counter);
    486  }
    487  counters->transmitted.Add(counter);
    488 
    489  send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now);
    490  if (bitrate_callback_) {
    491    send_rates = GetSendRates(now);
    492  }
    493 
    494  if (rtp_stats_callback_) {
    495    rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
    496  }
    497 
    498  // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
    499  // to the same object, so these callbacks could be consolidated into one.
    500  if (bitrate_callback_) {
    501    bitrate_callback_->Notify(
    502        send_rates.Sum().bps(),
    503        send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
    504  }
    505 }
    506 
    507 void RtpSenderEgress::PeriodicUpdate() {
    508  RTC_DCHECK_RUN_ON(worker_queue_);
    509  RTC_DCHECK(bitrate_callback_);
    510  RtpSendRates send_rates = GetSendRates(env_.clock().CurrentTime());
    511  bitrate_callback_->Notify(
    512      send_rates.Sum().bps(),
    513      send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
    514 }
    515 }  // namespace webrtc