rtp_packet_history.cc (12615B)
1 /* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/rtp_rtcp/source/rtp_packet_history.h" 12 13 #include <algorithm> 14 #include <cstddef> 15 #include <cstdint> 16 #include <limits> 17 #include <memory> 18 #include <optional> 19 #include <utility> 20 21 #include "api/array_view.h" 22 #include "api/environment/environment.h" 23 #include "api/function_view.h" 24 #include "api/units/time_delta.h" 25 #include "api/units/timestamp.h" 26 #include "modules/include/module_common_types_public.h" 27 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" 28 #include "rtc_base/checks.h" 29 #include "rtc_base/logging.h" 30 #include "rtc_base/synchronization/mutex.h" 31 #include "system_wrappers/include/clock.h" 32 33 namespace webrtc { 34 35 namespace { 36 37 constexpr size_t kOldPayloadPaddingSizeHysteresis = 100; 38 constexpr uint16_t kMaxOldPayloadPaddingSequenceNumber = 1 << 13; 39 40 } // namespace 41 42 RtpPacketHistory::StoredPacket::StoredPacket( 43 std::unique_ptr<RtpPacketToSend> packet, 44 Timestamp send_time, 45 uint64_t insert_order) 46 : packet_(std::move(packet)), 47 pending_transmission_(false), 48 send_time_(send_time), 49 insert_order_(insert_order), 50 times_retransmitted_(0) {} 51 52 RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default; 53 RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=( 54 RtpPacketHistory::StoredPacket&&) = default; 55 RtpPacketHistory::StoredPacket::~StoredPacket() = default; 56 57 void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted() { 58 ++times_retransmitted_; 59 } 60 61 RtpPacketHistory::RtpPacketHistory(const Environment& env, 62 PaddingMode padding_mode) 63 : clock_(&env.clock()), 64 padding_mode_(padding_mode), 65 number_to_store_(0), 66 mode_(StorageMode::kDisabled), 67 rtt_(TimeDelta::MinusInfinity()), 68 packets_inserted_(0) {} 69 70 RtpPacketHistory::~RtpPacketHistory() {} 71 72 void RtpPacketHistory::SetStorePacketsStatus(StorageMode mode, 73 size_t number_to_store) { 74 RTC_DCHECK_LE(number_to_store, kMaxCapacity); 75 MutexLock lock(&lock_); 76 if (mode != StorageMode::kDisabled && mode_ != StorageMode::kDisabled) { 77 RTC_LOG(LS_WARNING) << "Purging packet history in order to re-set status."; 78 } 79 Reset(); 80 mode_ = mode; 81 number_to_store_ = std::min(kMaxCapacity, number_to_store); 82 } 83 84 RtpPacketHistory::StorageMode RtpPacketHistory::GetStorageMode() const { 85 MutexLock lock(&lock_); 86 return mode_; 87 } 88 89 void RtpPacketHistory::SetRtt(TimeDelta rtt) { 90 MutexLock lock(&lock_); 91 RTC_DCHECK_GE(rtt, TimeDelta::Zero()); 92 rtt_ = rtt; 93 // If storage is not disabled, packets will be removed after a timeout 94 // that depends on the RTT. Changing the RTT may thus cause some packets 95 // become "old" and subject to removal. 96 if (mode_ != StorageMode::kDisabled) { 97 CullOldPackets(); 98 } 99 } 100 101 void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet, 102 Timestamp send_time) { 103 RTC_DCHECK(packet); 104 MutexLock lock(&lock_); 105 if (mode_ == StorageMode::kDisabled) { 106 return; 107 } 108 109 RTC_DCHECK(packet->allow_retransmission()); 110 CullOldPackets(); 111 112 // Store packet. 113 const uint16_t rtp_seq_no = packet->SequenceNumber(); 114 int packet_index = GetPacketIndex(rtp_seq_no); 115 if (packet_index >= 0 && 116 static_cast<size_t>(packet_index) < packet_history_.size() && 117 packet_history_[packet_index].packet_ != nullptr) { 118 RTC_LOG(LS_WARNING) << "Duplicate packet inserted: " << rtp_seq_no; 119 // Remove previous packet to avoid inconsistent state. 120 RemovePacket(packet_index); 121 packet_index = GetPacketIndex(rtp_seq_no); 122 } 123 124 // Packet to be inserted ahead of first packet, expand front. 125 for (; packet_index < 0; ++packet_index) { 126 packet_history_.emplace_front(); 127 } 128 // Packet to be inserted behind last packet, expand back. 129 while (static_cast<int>(packet_history_.size()) <= packet_index) { 130 packet_history_.emplace_back(); 131 } 132 133 RTC_DCHECK_GE(packet_index, 0); 134 RTC_DCHECK_LT(packet_index, packet_history_.size()); 135 RTC_DCHECK(packet_history_[packet_index].packet_ == nullptr); 136 137 if (padding_mode_ == PaddingMode::kRecentLargePacket) { 138 if ((!large_payload_packet_ || 139 packet->payload_size() + kOldPayloadPaddingSizeHysteresis > 140 large_payload_packet_->payload_size() || 141 IsNewerSequenceNumber(packet->SequenceNumber(), 142 large_payload_packet_->SequenceNumber() + 143 kMaxOldPayloadPaddingSequenceNumber))) { 144 large_payload_packet_.emplace(*packet); 145 } 146 } 147 148 packet_history_[packet_index] = 149 StoredPacket(std::move(packet), send_time, packets_inserted_++); 150 } 151 152 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending( 153 uint16_t sequence_number) { 154 return GetPacketAndMarkAsPending( 155 sequence_number, [](const RtpPacketToSend& packet) { 156 return std::make_unique<RtpPacketToSend>(packet); 157 }); 158 } 159 160 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending( 161 uint16_t sequence_number, 162 FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)> 163 encapsulate) { 164 MutexLock lock(&lock_); 165 if (mode_ == StorageMode::kDisabled) { 166 return nullptr; 167 } 168 169 StoredPacket* packet = GetStoredPacket(sequence_number); 170 if (packet == nullptr) { 171 return nullptr; 172 } 173 174 if (packet->pending_transmission_) { 175 // Packet already in pacer queue, ignore this request. 176 return nullptr; 177 } 178 179 if (!VerifyRtt(*packet)) { 180 // Packet already resent within too short a time window, ignore. 181 return nullptr; 182 } 183 184 // Copy and/or encapsulate packet. 185 std::unique_ptr<RtpPacketToSend> encapsulated_packet = 186 encapsulate(*packet->packet_); 187 if (encapsulated_packet) { 188 packet->pending_transmission_ = true; 189 } 190 191 return encapsulated_packet; 192 } 193 194 void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) { 195 MutexLock lock(&lock_); 196 if (mode_ == StorageMode::kDisabled) { 197 return; 198 } 199 200 StoredPacket* packet = GetStoredPacket(sequence_number); 201 if (packet == nullptr) { 202 return; 203 } 204 205 // Update send-time, mark as no longer in pacer queue, and increment 206 // transmission count. 207 packet->set_send_time(clock_->CurrentTime()); 208 packet->pending_transmission_ = false; 209 packet->IncrementTimesRetransmitted(); 210 } 211 212 bool RtpPacketHistory::GetPacketState(uint16_t sequence_number) const { 213 MutexLock lock(&lock_); 214 if (mode_ == StorageMode::kDisabled) { 215 return false; 216 } 217 218 int packet_index = GetPacketIndex(sequence_number); 219 if (packet_index < 0 || 220 static_cast<size_t>(packet_index) >= packet_history_.size()) { 221 return false; 222 } 223 const StoredPacket& packet = packet_history_[packet_index]; 224 if (packet.packet_ == nullptr) { 225 return false; 226 } 227 228 if (!VerifyRtt(packet)) { 229 return false; 230 } 231 232 return true; 233 } 234 235 bool RtpPacketHistory::VerifyRtt( 236 const RtpPacketHistory::StoredPacket& packet) const { 237 if (packet.times_retransmitted() > 0 && 238 clock_->CurrentTime() - packet.send_time() < rtt_) { 239 // This packet has already been retransmitted once, and the time since 240 // that even is lower than on RTT. Ignore request as this packet is 241 // likely already in the network pipe. 242 return false; 243 } 244 245 return true; 246 } 247 248 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket() { 249 // Default implementation always just returns a copy of the packet. 250 return GetPayloadPaddingPacket([](const RtpPacketToSend& packet) { 251 return std::make_unique<RtpPacketToSend>(packet); 252 }); 253 } 254 255 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket( 256 FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)> 257 encapsulate) { 258 MutexLock lock(&lock_); 259 if (mode_ == StorageMode::kDisabled) { 260 return nullptr; 261 } 262 if (padding_mode_ == PaddingMode::kRecentLargePacket && 263 large_payload_packet_) { 264 return encapsulate(*large_payload_packet_); 265 } 266 267 StoredPacket* best_packet = nullptr; 268 if (!packet_history_.empty()) { 269 // Pick the last packet. 270 for (auto it = packet_history_.rbegin(); it != packet_history_.rend(); 271 ++it) { 272 if (it->packet_ != nullptr) { 273 best_packet = &(*it); 274 break; 275 } 276 } 277 } 278 if (best_packet == nullptr) { 279 return nullptr; 280 } 281 282 if (best_packet->pending_transmission_) { 283 // Because PacedSender releases it's lock when it calls 284 // GeneratePadding() there is the potential for a race where a new 285 // packet ends up here instead of the regular transmit path. In such a 286 // case, just return empty and it will be picked up on the next 287 // Process() call. 288 return nullptr; 289 } 290 291 auto padding_packet = encapsulate(*best_packet->packet_); 292 if (!padding_packet) { 293 return nullptr; 294 } 295 296 best_packet->set_send_time(clock_->CurrentTime()); 297 best_packet->IncrementTimesRetransmitted(); 298 return padding_packet; 299 } 300 301 void RtpPacketHistory::CullAcknowledgedPackets( 302 ArrayView<const uint16_t> sequence_numbers) { 303 MutexLock lock(&lock_); 304 for (uint16_t sequence_number : sequence_numbers) { 305 int packet_index = GetPacketIndex(sequence_number); 306 if (packet_index < 0 || 307 static_cast<size_t>(packet_index) >= packet_history_.size()) { 308 continue; 309 } 310 RemovePacket(packet_index); 311 } 312 } 313 314 void RtpPacketHistory::Clear() { 315 MutexLock lock(&lock_); 316 Reset(); 317 } 318 319 void RtpPacketHistory::Reset() { 320 packet_history_.clear(); 321 large_payload_packet_ = std::nullopt; 322 } 323 324 void RtpPacketHistory::CullOldPackets() { 325 Timestamp now = clock_->CurrentTime(); 326 TimeDelta packet_duration = 327 rtt_.IsFinite() 328 ? std::max(kMinPacketDurationRtt * rtt_, kMinPacketDuration) 329 : kMinPacketDuration; 330 while (!packet_history_.empty()) { 331 if (packet_history_.size() >= kMaxCapacity) { 332 // We have reached the absolute max capacity, remove one packet 333 // unconditionally. 334 RemovePacket(0); 335 continue; 336 } 337 338 const StoredPacket& stored_packet = packet_history_.front(); 339 if (stored_packet.pending_transmission_) { 340 // Don't remove packets in the pacer queue, pending tranmission. 341 return; 342 } 343 344 if (stored_packet.send_time() + packet_duration > now) { 345 // Don't cull packets too early to avoid failed retransmission requests. 346 return; 347 } 348 349 if (packet_history_.size() >= number_to_store_ || 350 stored_packet.send_time() + 351 (packet_duration * kPacketCullingDelayFactor) <= 352 now) { 353 // Too many packets in history, or this packet has timed out. Remove it 354 // and continue. 355 RemovePacket(0); 356 } else { 357 // No more packets can be removed right now. 358 return; 359 } 360 } 361 } 362 363 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket( 364 int packet_index) { 365 // Move the packet out from the StoredPacket container. 366 std::unique_ptr<RtpPacketToSend> rtp_packet = 367 std::move(packet_history_[packet_index].packet_); 368 if (packet_index == 0) { 369 while (!packet_history_.empty() && 370 packet_history_.front().packet_ == nullptr) { 371 packet_history_.pop_front(); 372 } 373 } 374 375 return rtp_packet; 376 } 377 378 int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const { 379 if (packet_history_.empty()) { 380 return 0; 381 } 382 383 RTC_DCHECK(packet_history_.front().packet_ != nullptr); 384 int first_seq = packet_history_.front().packet_->SequenceNumber(); 385 if (first_seq == sequence_number) { 386 return 0; 387 } 388 389 int packet_index = sequence_number - first_seq; 390 constexpr int kSeqNumSpan = std::numeric_limits<uint16_t>::max() + 1; 391 392 if (IsNewerSequenceNumber(sequence_number, first_seq)) { 393 if (sequence_number < first_seq) { 394 // Forward wrap. 395 packet_index += kSeqNumSpan; 396 } 397 } else if (sequence_number > first_seq) { 398 // Backwards wrap. 399 packet_index -= kSeqNumSpan; 400 } 401 402 return packet_index; 403 } 404 405 RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket( 406 uint16_t sequence_number) { 407 int index = GetPacketIndex(sequence_number); 408 if (index < 0 || static_cast<size_t>(index) >= packet_history_.size() || 409 packet_history_[index].packet_ == nullptr) { 410 return nullptr; 411 } 412 return &packet_history_[index]; 413 } 414 415 } // namespace webrtc