transport_feedback_adapter.cc (16226B)
1 /* 2 * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/congestion_controller/rtp/transport_feedback_adapter.h" 12 13 #include <algorithm> 14 #include <cstdint> 15 #include <cstdlib> 16 #include <optional> 17 #include <utility> 18 #include <vector> 19 20 #include "absl/algorithm/container.h" 21 #include "api/transport/ecn_marking.h" 22 #include "api/transport/network_types.h" 23 #include "api/units/data_size.h" 24 #include "api/units/time_delta.h" 25 #include "api/units/timestamp.h" 26 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 27 #include "modules/rtp_rtcp/source/ntp_time_util.h" 28 #include "modules/rtp_rtcp/source/rtcp_packet/congestion_control_feedback.h" 29 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" 30 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" 31 #include "rtc_base/checks.h" 32 #include "rtc_base/logging.h" 33 #include "rtc_base/network/sent_packet.h" 34 #include "rtc_base/network_route.h" 35 36 namespace webrtc { 37 38 constexpr TimeDelta kSendTimeHistoryWindow = TimeDelta::Seconds(60); 39 40 void InFlightBytesTracker::AddInFlightPacketBytes( 41 const PacketFeedback& packet) { 42 RTC_DCHECK(packet.sent.send_time.IsFinite()); 43 auto it = in_flight_data_.find(packet.network_route); 44 if (it != in_flight_data_.end()) { 45 it->second += packet.sent.size; 46 } else { 47 in_flight_data_.insert({packet.network_route, packet.sent.size}); 48 } 49 } 50 51 void InFlightBytesTracker::RemoveInFlightPacketBytes( 52 const PacketFeedback& packet) { 53 if (packet.sent.send_time.IsInfinite()) 54 return; 55 auto it = in_flight_data_.find(packet.network_route); 56 if (it != in_flight_data_.end()) { 57 RTC_DCHECK_GE(it->second, packet.sent.size); 58 it->second -= packet.sent.size; 59 if (it->second.IsZero()) 60 in_flight_data_.erase(it); 61 } 62 } 63 64 DataSize InFlightBytesTracker::GetOutstandingData( 65 const NetworkRoute& network_route) const { 66 auto it = in_flight_data_.find(network_route); 67 if (it != in_flight_data_.end()) { 68 return it->second; 69 } else { 70 return DataSize::Zero(); 71 } 72 } 73 74 // Comparator for consistent map with NetworkRoute as key. 75 bool InFlightBytesTracker::NetworkRouteComparator::operator()( 76 const NetworkRoute& a, 77 const NetworkRoute& b) const { 78 if (a.local.network_id() != b.local.network_id()) 79 return a.local.network_id() < b.local.network_id(); 80 if (a.remote.network_id() != b.remote.network_id()) 81 return a.remote.network_id() < b.remote.network_id(); 82 83 if (a.local.adapter_id() != b.local.adapter_id()) 84 return a.local.adapter_id() < b.local.adapter_id(); 85 if (a.remote.adapter_id() != b.remote.adapter_id()) 86 return a.remote.adapter_id() < b.remote.adapter_id(); 87 88 if (a.local.uses_turn() != b.local.uses_turn()) 89 return a.local.uses_turn() < b.local.uses_turn(); 90 if (a.remote.uses_turn() != b.remote.uses_turn()) 91 return a.remote.uses_turn() < b.remote.uses_turn(); 92 93 return a.connected < b.connected; 94 } 95 96 TransportFeedbackAdapter::TransportFeedbackAdapter() = default; 97 98 void TransportFeedbackAdapter::AddPacket(const RtpPacketToSend& packet_to_send, 99 const PacedPacketInfo& pacing_info, 100 size_t overhead_bytes, 101 Timestamp creation_time) { 102 RTC_DCHECK(packet_to_send.transport_sequence_number()); 103 PacketFeedback feedback; 104 105 feedback.creation_time = creation_time; 106 // Note, if transport sequence number header extension is used, transport 107 // sequence numbers are wrapped to 16 bit. See 108 // RtpSenderEgress::CompleteSendPacket. 109 feedback.sent.sequence_number = seq_num_unwrapper_.Unwrap( 110 packet_to_send.transport_sequence_number().value_or(0)); 111 feedback.sent.size = DataSize::Bytes(packet_to_send.size() + overhead_bytes); 112 feedback.sent.audio = 113 packet_to_send.packet_type() == RtpPacketMediaType::kAudio; 114 feedback.network_route = network_route_; 115 feedback.sent.pacing_info = pacing_info; 116 feedback.ssrc = packet_to_send.Ssrc(); 117 feedback.rtp_sequence_number = packet_to_send.SequenceNumber(); 118 feedback.is_retransmission = 119 packet_to_send.packet_type() == RtpPacketMediaType::kRetransmission; 120 121 while (!history_.empty() && 122 creation_time - history_.begin()->second.creation_time > 123 kSendTimeHistoryWindow) { 124 // TODO(sprang): Warn if erasing (too many) old items? 125 if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) 126 in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); 127 128 const PacketFeedback& packet = history_.begin()->second; 129 rtp_to_transport_sequence_number_.erase( 130 {.ssrc = packet.ssrc, 131 .rtp_sequence_number = packet.rtp_sequence_number}); 132 history_.erase(history_.begin()); 133 } 134 // Note that it can happen that the same SSRC and sequence number is sent 135 // again. e.g, audio retransmission. 136 rtp_to_transport_sequence_number_.emplace( 137 SsrcAndRtpSequencenumber( 138 {.ssrc = feedback.ssrc, 139 .rtp_sequence_number = feedback.rtp_sequence_number}), 140 feedback.sent.sequence_number); 141 history_.emplace(feedback.sent.sequence_number, feedback); 142 } 143 144 std::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket( 145 const SentPacketInfo& sent_packet) { 146 auto send_time = Timestamp::Millis(sent_packet.send_time_ms); 147 // TODO(srte): Only use one way to indicate that packet feedback is used. 148 if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) { 149 int64_t unwrapped_seq_num = 150 seq_num_unwrapper_.Unwrap(sent_packet.packet_id); 151 auto it = history_.find(unwrapped_seq_num); 152 if (it != history_.end()) { 153 bool packet_retransmit = it->second.sent.send_time.IsFinite(); 154 it->second.sent.send_time = send_time; 155 last_send_time_ = std::max(last_send_time_, send_time); 156 // TODO(srte): Don't do this on retransmit. 157 if (!pending_untracked_size_.IsZero()) { 158 if (send_time < last_untracked_send_time_) 159 RTC_LOG(LS_WARNING) 160 << "appending acknowledged data for out of order packet. (Diff: " 161 << ToString(last_untracked_send_time_ - send_time) << " ms.)"; 162 it->second.sent.prior_unacked_data += pending_untracked_size_; 163 pending_untracked_size_ = DataSize::Zero(); 164 } 165 if (!packet_retransmit) { 166 if (it->second.sent.sequence_number > last_ack_seq_num_) 167 in_flight_.AddInFlightPacketBytes(it->second); 168 it->second.sent.data_in_flight = GetOutstandingData(); 169 return it->second.sent; 170 } 171 } 172 } else if (sent_packet.info.included_in_allocation) { 173 if (send_time < last_send_time_) { 174 RTC_LOG(LS_WARNING) << "ignoring untracked data for out of order packet."; 175 } 176 pending_untracked_size_ += 177 DataSize::Bytes(sent_packet.info.packet_size_bytes); 178 last_untracked_send_time_ = std::max(last_untracked_send_time_, send_time); 179 } 180 return std::nullopt; 181 } 182 183 std::optional<TransportPacketsFeedback> 184 TransportFeedbackAdapter::ProcessTransportFeedback( 185 const rtcp::TransportFeedback& feedback, 186 Timestamp feedback_receive_time) { 187 if (feedback.GetPacketStatusCount() == 0) { 188 RTC_LOG(LS_INFO) << "Empty transport feedback packet received."; 189 return std::nullopt; 190 } 191 192 // Add timestamp deltas to a local time base selected on first packet arrival. 193 // This won't be the true time base, but makes it easier to manually inspect 194 // time stamps. 195 if (last_transport_feedback_base_time_.IsInfinite()) { 196 current_offset_ = feedback_receive_time; 197 } else { 198 // TODO(srte): We shouldn't need to do rounding here. 199 const TimeDelta delta = 200 feedback.GetBaseDelta(last_transport_feedback_base_time_) 201 .RoundDownTo(TimeDelta::Millis(1)); 202 // Protect against assigning current_offset_ negative value. 203 if (delta < Timestamp::Zero() - current_offset_) { 204 RTC_LOG(LS_WARNING) << "Unexpected feedback timestamp received."; 205 current_offset_ = feedback_receive_time; 206 } else { 207 current_offset_ += delta; 208 } 209 } 210 last_transport_feedback_base_time_ = feedback.BaseTime(); 211 212 std::vector<PacketResult> packet_result_vector; 213 packet_result_vector.reserve(feedback.GetPacketStatusCount()); 214 215 size_t failed_lookups = 0; 216 size_t ignored = 0; 217 218 feedback.ForAllPackets([&](uint16_t sequence_number, 219 TimeDelta delta_since_base) { 220 int64_t seq_num = seq_num_unwrapper_.Unwrap(sequence_number); 221 std::optional<PacketFeedback> packet_feedback = RetrievePacketFeedback( 222 seq_num, /*received=*/delta_since_base.IsFinite()); 223 if (!packet_feedback) { 224 ++failed_lookups; 225 return; 226 } 227 228 if (packet_feedback->network_route == network_route_) { 229 PacketResult result; 230 result.sent_packet = packet_feedback->sent; 231 if (delta_since_base.IsFinite()) { 232 result.receive_time = current_offset_ + delta_since_base.RoundDownTo( 233 TimeDelta::Millis(1)); 234 } 235 result.rtp_packet_info = { 236 .ssrc = packet_feedback->ssrc, 237 .rtp_sequence_number = packet_feedback->rtp_sequence_number, 238 .is_retransmission = packet_feedback->is_retransmission}; 239 packet_result_vector.push_back(result); 240 } else { 241 ++ignored; 242 } 243 }); 244 245 if (failed_lookups > 0) { 246 RTC_LOG(LS_WARNING) 247 << "Failed to lookup send time for " << failed_lookups << " packet" 248 << (failed_lookups > 1 ? "s" : "") 249 << ". Packets reordered or send time history too small?"; 250 } 251 if (ignored > 0) { 252 RTC_LOG(LS_INFO) << "Ignoring " << ignored 253 << " packets because they were sent on a different route."; 254 } 255 return ToTransportFeedback(std::move(packet_result_vector), 256 feedback_receive_time, /*supports_ecn=*/false); 257 } 258 259 std::optional<TransportPacketsFeedback> 260 TransportFeedbackAdapter::ProcessCongestionControlFeedback( 261 const rtcp::CongestionControlFeedback& feedback, 262 Timestamp feedback_receive_time) { 263 if (feedback.packets().empty()) { 264 RTC_LOG(LS_INFO) << "Empty congestion control feedback packet received."; 265 return std::nullopt; 266 } 267 if (current_offset_.IsInfinite()) { 268 current_offset_ = feedback_receive_time; 269 } 270 TimeDelta feedback_delta = last_feedback_compact_ntp_time_ 271 ? CompactNtpIntervalToTimeDelta( 272 feedback.report_timestamp_compact_ntp() - 273 *last_feedback_compact_ntp_time_) 274 : TimeDelta::Zero(); 275 last_feedback_compact_ntp_time_ = feedback.report_timestamp_compact_ntp(); 276 if (feedback_delta < TimeDelta::Zero()) { 277 RTC_LOG(LS_WARNING) << "Unexpected feedback ntp time delta " 278 << feedback_delta << "."; 279 current_offset_ = feedback_receive_time; 280 } else { 281 current_offset_ += feedback_delta; 282 } 283 284 int ignored_packets = 0; 285 int failed_lookups = 0; 286 bool supports_ecn = true; 287 std::vector<PacketResult> packet_result_vector; 288 for (const rtcp::CongestionControlFeedback::PacketInfo& packet_info : 289 feedback.packets()) { 290 std::optional<PacketFeedback> packet_feedback = RetrievePacketFeedback( 291 {.ssrc = packet_info.ssrc, 292 .rtp_sequence_number = packet_info.sequence_number}, 293 /*received=*/packet_info.arrival_time_offset.IsFinite()); 294 if (!packet_feedback) { 295 ++failed_lookups; 296 continue; 297 } 298 if (packet_feedback->network_route != network_route_) { 299 ++ignored_packets; 300 continue; 301 } 302 PacketResult result; 303 result.sent_packet = packet_feedback->sent; 304 if (packet_info.arrival_time_offset.IsFinite()) { 305 result.receive_time = current_offset_ - packet_info.arrival_time_offset; 306 TimeDelta rtt = feedback_receive_time - result.sent_packet.send_time - 307 packet_info.arrival_time_offset; 308 if (smoothed_rtt_.IsInfinite()) { 309 smoothed_rtt_ = rtt; 310 } 311 smoothed_rtt_ = (smoothed_rtt_ * 7 + rtt) / 8; // RFC 6298, alpha = 1/8 312 supports_ecn &= packet_info.ecn != EcnMarking::kNotEct; 313 } 314 result.ecn = packet_info.ecn; 315 result.rtp_packet_info = { 316 .ssrc = packet_feedback->ssrc, 317 .rtp_sequence_number = packet_feedback->rtp_sequence_number, 318 .is_retransmission = packet_feedback->is_retransmission}; 319 packet_result_vector.push_back(result); 320 } 321 322 if (failed_lookups > 0) { 323 RTC_LOG(LS_WARNING) 324 << "Failed to lookup send time for " << failed_lookups << " packet" 325 << (failed_lookups > 1 ? "s" : "") 326 << ". Packets reordered or send time history too small?"; 327 } 328 if (ignored_packets > 0) { 329 RTC_LOG(LS_INFO) << "Ignoring " << ignored_packets 330 << " packets because they were sent on a different route."; 331 } 332 333 // Feedback is expected to be sorted in send order. 334 absl::c_sort(packet_result_vector, [](const PacketResult& lhs, 335 const PacketResult& rhs) { 336 return lhs.sent_packet.sequence_number < rhs.sent_packet.sequence_number; 337 }); 338 return ToTransportFeedback(std::move(packet_result_vector), 339 feedback_receive_time, supports_ecn); 340 } 341 342 std::optional<TransportPacketsFeedback> 343 TransportFeedbackAdapter::ToTransportFeedback( 344 std::vector<PacketResult> packet_results, 345 Timestamp feedback_receive_time, 346 bool supports_ecn) { 347 TransportPacketsFeedback msg; 348 msg.feedback_time = feedback_receive_time; 349 if (packet_results.empty()) { 350 return std::nullopt; 351 } 352 msg.packet_feedbacks = std::move(packet_results); 353 msg.data_in_flight = in_flight_.GetOutstandingData(network_route_); 354 msg.transport_supports_ecn = supports_ecn; 355 msg.smoothed_rtt = smoothed_rtt_; 356 357 return msg; 358 } 359 360 void TransportFeedbackAdapter::SetNetworkRoute( 361 const NetworkRoute& network_route) { 362 network_route_ = network_route; 363 smoothed_rtt_ = TimeDelta::PlusInfinity(); 364 } 365 366 DataSize TransportFeedbackAdapter::GetOutstandingData() const { 367 return in_flight_.GetOutstandingData(network_route_); 368 } 369 370 std::optional<PacketFeedback> TransportFeedbackAdapter::RetrievePacketFeedback( 371 const SsrcAndRtpSequencenumber& key, 372 bool received) { 373 auto it = rtp_to_transport_sequence_number_.find(key); 374 if (it == rtp_to_transport_sequence_number_.end()) { 375 return std::nullopt; 376 } 377 return RetrievePacketFeedback(it->second, received); 378 } 379 380 std::optional<PacketFeedback> TransportFeedbackAdapter::RetrievePacketFeedback( 381 int64_t transport_seq_num, 382 bool received) { 383 if (transport_seq_num > last_ack_seq_num_) { 384 // Starts at history_.begin() if last_ack_seq_num_ < 0, since any 385 // valid sequence number is >= 0. 386 for (auto it = history_.upper_bound(last_ack_seq_num_); 387 it != history_.upper_bound(transport_seq_num); ++it) { 388 in_flight_.RemoveInFlightPacketBytes(it->second); 389 } 390 last_ack_seq_num_ = transport_seq_num; 391 } 392 393 auto it = history_.find(transport_seq_num); 394 if (it == history_.end()) { 395 RTC_LOG(LS_WARNING) << "Failed to lookup send time for packet with " 396 << transport_seq_num 397 << ". Send time history too small?"; 398 return std::nullopt; 399 } 400 401 if (it->second.sent.send_time.IsInfinite()) { 402 // TODO(srte): Fix the tests that makes this happen and make this a 403 // DCHECK. 404 RTC_DLOG(LS_ERROR) 405 << "Received feedback before packet was indicated as sent"; 406 return std::nullopt; 407 } 408 409 PacketFeedback packet_feedback = it->second; 410 if (received) { 411 // Note: Lost packets are not removed from history because they might 412 // be reported as received by a later feedback. 413 rtp_to_transport_sequence_number_.erase( 414 {.ssrc = packet_feedback.ssrc, 415 .rtp_sequence_number = packet_feedback.rtp_sequence_number}); 416 history_.erase(it); 417 } 418 return packet_feedback; 419 } 420 421 } // namespace webrtc