receive_statistics_impl.cc (15644B)
1 /* 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "modules/rtp_rtcp/source/receive_statistics_impl.h" 12 13 #include <algorithm> 14 #include <cmath> 15 #include <cstdint> 16 #include <cstdlib> 17 #include <functional> 18 #include <memory> 19 #include <optional> 20 #include <utility> 21 #include <vector> 22 23 #include "api/units/data_rate.h" 24 #include "api/units/time_delta.h" 25 #include "api/units/timestamp.h" 26 #include "modules/rtp_rtcp/include/receive_statistics.h" 27 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 28 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h" 29 #include "modules/rtp_rtcp/source/rtp_packet_received.h" 30 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h" 31 #include "rtc_base/checks.h" 32 #include "rtc_base/logging.h" 33 #include "rtc_base/time_utils.h" 34 #include "system_wrappers/include/clock.h" 35 #include "system_wrappers/include/ntp_time.h" 36 37 namespace webrtc { 38 namespace { 39 constexpr TimeDelta kStatisticsTimeout = TimeDelta::Seconds(8); 40 constexpr TimeDelta kStatisticsProcessInterval = TimeDelta::Seconds(1); 41 42 TimeDelta UnixEpochDelta(Clock& clock) { 43 Timestamp now = clock.CurrentTime(); 44 NtpTime ntp_now = clock.ConvertTimestampToNtpTime(now); 45 return TimeDelta::Millis(ntp_now.ToMs() - now.ms() - kNtpJan1970Millisecs); 46 } 47 48 } // namespace 49 50 StreamStatistician::~StreamStatistician() {} 51 52 StreamStatisticianImpl::StreamStatisticianImpl(uint32_t ssrc, Clock* clock) 53 : ssrc_(ssrc), 54 clock_(clock), 55 delta_internal_unix_epoch_(UnixEpochDelta(*clock_)), 56 incoming_bitrate_(/*max_window_size=*/kStatisticsProcessInterval), 57 max_reordering_threshold_(kDefaultMaxReorderingThreshold), 58 enable_retransmit_detection_(false), 59 cumulative_loss_is_capped_(false), 60 jitter_q4_(0), 61 cumulative_loss_(0), 62 cumulative_loss_rtcp_offset_(0), 63 last_received_timestamp_(0), 64 received_seq_first_(-1), 65 received_seq_max_(-1), 66 last_report_cumulative_loss_(0), 67 last_report_seq_max_(-1), 68 last_payload_type_frequency_(0) {} 69 70 StreamStatisticianImpl::~StreamStatisticianImpl() = default; 71 72 bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet, 73 int64_t sequence_number, 74 Timestamp now) { 75 // Check if `packet` is second packet of a stream restart. 76 if (received_seq_out_of_order_) { 77 // Count the previous packet as a received; it was postponed below. 78 --cumulative_loss_; 79 80 uint16_t expected_sequence_number = *received_seq_out_of_order_ + 1; 81 received_seq_out_of_order_ = std::nullopt; 82 if (packet.SequenceNumber() == expected_sequence_number) { 83 // Ignore sequence number gap caused by stream restart for packet loss 84 // calculation, by setting received_seq_max_ to the sequence number just 85 // before the out-of-order seqno. This gives a net zero change of 86 // `cumulative_loss_`, for the two packets interpreted as a stream reset. 87 // 88 // Fraction loss for the next report may get a bit off, since we don't 89 // update last_report_seq_max_ and last_report_cumulative_loss_ in a 90 // consistent way. 91 last_report_seq_max_ = sequence_number - 2; 92 received_seq_max_ = sequence_number - 2; 93 return false; 94 } 95 } 96 97 if (std::abs(sequence_number - received_seq_max_) > 98 max_reordering_threshold_) { 99 // Sequence number gap looks too large, wait until next packet to check 100 // for a stream restart. 101 received_seq_out_of_order_ = packet.SequenceNumber(); 102 // Postpone counting this as a received packet until we know how to update 103 // `received_seq_max_`, otherwise we temporarily decrement 104 // `cumulative_loss_`. The 105 // ReceiveStatisticsTest.StreamRestartDoesntCountAsLoss test expects 106 // `cumulative_loss_` to be unchanged by the reception of the first packet 107 // after stream reset. 108 ++cumulative_loss_; 109 return true; 110 } 111 112 if (sequence_number > received_seq_max_) 113 return false; 114 115 // Old out of order packet, may be retransmit. 116 if (enable_retransmit_detection_ && IsRetransmitOfOldPacket(packet, now)) 117 receive_counters_.retransmitted.AddPacket(packet); 118 return true; 119 } 120 121 void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) { 122 RTC_DCHECK_EQ(ssrc_, packet.Ssrc()); 123 Timestamp now = clock_->CurrentTime(); 124 125 incoming_bitrate_.Update(packet.size(), now); 126 receive_counters_.transmitted.AddPacket(packet); 127 --cumulative_loss_; 128 129 // Use PeekUnwrap and later update the state to avoid updating the state for 130 // out of order packets. 131 int64_t sequence_number = seq_unwrapper_.PeekUnwrap(packet.SequenceNumber()); 132 133 if (!ReceivedRtpPacket()) { 134 received_seq_first_ = sequence_number; 135 last_report_seq_max_ = sequence_number - 1; 136 received_seq_max_ = sequence_number - 1; 137 receive_counters_.first_packet_time = now; 138 } else if (UpdateOutOfOrder(packet, sequence_number, now)) { 139 return; 140 } 141 // In order packet. 142 cumulative_loss_ += sequence_number - received_seq_max_; 143 received_seq_max_ = sequence_number; 144 // Update the internal state of `seq_unwrapper_`. 145 seq_unwrapper_.Unwrap(packet.SequenceNumber()); 146 147 // If new time stamp and more than one in-order packet received, calculate 148 // new jitter statistics. 149 if (packet.Timestamp() != last_received_timestamp_ && 150 (receive_counters_.transmitted.packets - 151 receive_counters_.retransmitted.packets) > 1) { 152 UpdateJitter(packet, now); 153 } 154 last_received_timestamp_ = packet.Timestamp(); 155 last_receive_time_ = now; 156 } 157 158 void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet, 159 Timestamp receive_time) { 160 RTC_DCHECK(last_receive_time_.has_value()); 161 TimeDelta receive_diff = receive_time - *last_receive_time_; 162 RTC_DCHECK_GE(receive_diff, TimeDelta::Zero()); 163 uint32_t receive_diff_rtp = 164 (receive_diff * packet.payload_type_frequency()).seconds<uint32_t>(); 165 int32_t time_diff_samples = 166 receive_diff_rtp - (packet.Timestamp() - last_received_timestamp_); 167 168 ReviseFrequencyAndJitter(packet.payload_type_frequency()); 169 170 // lib_jingle sometimes deliver crazy jumps in TS for the same stream. 171 // If this happens, don't update jitter value. Use 5 secs video frequency 172 // as the threshold. 173 if (time_diff_samples < 5 * kVideoPayloadTypeFrequency && 174 time_diff_samples > -5 * kVideoPayloadTypeFrequency) { 175 // Note we calculate in Q4 to avoid using float. 176 int32_t jitter_diff_q4 = (std::abs(time_diff_samples) << 4) - jitter_q4_; 177 jitter_q4_ += ((jitter_diff_q4 + 8) >> 4); 178 } 179 } 180 181 void StreamStatisticianImpl::ReviseFrequencyAndJitter( 182 int payload_type_frequency) { 183 if (payload_type_frequency == last_payload_type_frequency_) { 184 return; 185 } 186 187 if (payload_type_frequency != 0) { 188 if (last_payload_type_frequency_ != 0) { 189 // Value in "jitter_q4_" variable is a number of samples. 190 // I.e. jitter = timestamp (s) * frequency (Hz). 191 // Since the frequency has changed we have to update the number of samples 192 // accordingly. The new value should rely on a new frequency. 193 194 // If we don't do such procedure we end up with the number of samples that 195 // cannot be converted into TimeDelta correctly 196 // (i.e. jitter = jitter_q4_ >> 4 / payload_type_frequency). 197 // In such case, the number of samples has a "mix". 198 199 // Doing so we pretend that everything prior and including the current 200 // packet were computed on packet's frequency. 201 jitter_q4_ = static_cast<int>(static_cast<uint64_t>(jitter_q4_) * 202 payload_type_frequency / 203 last_payload_type_frequency_); 204 } 205 // If last_payload_type_frequency_ is not present, the jitter_q4_ 206 // variable has its initial value. 207 208 // Keep last_payload_type_frequency_ up to date and non-zero (set). 209 last_payload_type_frequency_ = payload_type_frequency; 210 } 211 } 212 213 void StreamStatisticianImpl::SetMaxReorderingThreshold( 214 int max_reordering_threshold) { 215 max_reordering_threshold_ = max_reordering_threshold; 216 } 217 218 void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) { 219 enable_retransmit_detection_ = enable; 220 } 221 222 RtpReceiveStats StreamStatisticianImpl::GetStats() const { 223 RtpReceiveStats stats; 224 stats.packets_lost = cumulative_loss_; 225 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16. 226 stats.jitter = jitter_q4_ >> 4; 227 if (last_payload_type_frequency_ > 0) { 228 // Divide value in fractional seconds by frequency to get jitter in 229 // fractional seconds. 230 stats.interarrival_jitter = 231 TimeDelta::Seconds(stats.jitter) / last_payload_type_frequency_; 232 } 233 if (last_receive_time_.has_value()) { 234 stats.last_packet_received = 235 *last_receive_time_ + delta_internal_unix_epoch_; 236 } 237 stats.packet_counter = receive_counters_.transmitted; 238 return stats; 239 } 240 241 void StreamStatisticianImpl::MaybeAppendReportBlockAndReset( 242 std::vector<rtcp::ReportBlock>& report_blocks) { 243 if (!ReceivedRtpPacket()) { 244 return; 245 } 246 Timestamp now = clock_->CurrentTime(); 247 if (now - *last_receive_time_ >= kStatisticsTimeout) { 248 // Not active. 249 return; 250 } 251 252 report_blocks.emplace_back(); 253 rtcp::ReportBlock& stats = report_blocks.back(); 254 stats.SetMediaSsrc(ssrc_); 255 // Calculate fraction lost. 256 int64_t exp_since_last = received_seq_max_ - last_report_seq_max_; 257 RTC_DCHECK_GE(exp_since_last, 0); 258 259 int32_t lost_since_last = cumulative_loss_ - last_report_cumulative_loss_; 260 if (exp_since_last > 0 && lost_since_last > 0) { 261 // Scale 0 to 255, where 255 is 100% loss. 262 stats.SetFractionLost(255 * lost_since_last / exp_since_last); 263 } 264 265 int packets_lost = cumulative_loss_ + cumulative_loss_rtcp_offset_; 266 if (packets_lost < 0) { 267 // Clamp to zero. Work around to accommodate for senders that misbehave with 268 // negative cumulative loss. 269 packets_lost = 0; 270 cumulative_loss_rtcp_offset_ = -cumulative_loss_; 271 } 272 if (packets_lost > 0x7fffff) { 273 // Packets lost is a 24 bit signed field, and thus should be clamped, as 274 // described in https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.3 275 if (!cumulative_loss_is_capped_) { 276 cumulative_loss_is_capped_ = true; 277 RTC_LOG(LS_WARNING) << "Cumulative loss reached maximum value for ssrc " 278 << ssrc_; 279 } 280 packets_lost = 0x7fffff; 281 } 282 stats.SetCumulativeLost(packets_lost); 283 stats.SetExtHighestSeqNum(received_seq_max_); 284 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16. 285 stats.SetJitter(jitter_q4_ >> 4); 286 287 // Only for report blocks in RTCP SR and RR. 288 last_report_cumulative_loss_ = cumulative_loss_; 289 last_report_seq_max_ = received_seq_max_; 290 } 291 292 std::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const { 293 if (!ReceivedRtpPacket()) { 294 return std::nullopt; 295 } 296 int64_t expected_packets = 1 + received_seq_max_ - received_seq_first_; 297 if (expected_packets <= 0) { 298 return std::nullopt; 299 } 300 if (cumulative_loss_ <= 0) { 301 return 0; 302 } 303 return 100 * static_cast<int64_t>(cumulative_loss_) / expected_packets; 304 } 305 306 StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters() 307 const { 308 return receive_counters_; 309 } 310 311 uint32_t StreamStatisticianImpl::BitrateReceived() const { 312 return incoming_bitrate_.Rate(clock_->CurrentTime()) 313 .value_or(DataRate::Zero()) 314 .bps<uint32_t>(); 315 } 316 317 bool StreamStatisticianImpl::IsRetransmitOfOldPacket( 318 const RtpPacketReceived& packet, 319 Timestamp now) const { 320 int frequency_hz = packet.payload_type_frequency(); 321 RTC_DCHECK(last_receive_time_.has_value()); 322 RTC_CHECK_GT(frequency_hz, 0); 323 TimeDelta time_diff = now - *last_receive_time_; 324 325 // Diff in time stamp since last received in order. 326 uint32_t timestamp_diff = packet.Timestamp() - last_received_timestamp_; 327 TimeDelta rtp_time_stamp_diff = 328 TimeDelta::Seconds(timestamp_diff) / frequency_hz; 329 330 // Jitter standard deviation in samples. 331 float jitter_std = std::sqrt(static_cast<float>(jitter_q4_ >> 4)); 332 333 // 2 times the standard deviation => 95% confidence. 334 // Min max_delay is 1ms. 335 TimeDelta max_delay = std::max( 336 TimeDelta::Seconds(2 * jitter_std / frequency_hz), TimeDelta::Millis(1)); 337 338 return time_diff > rtp_time_stamp_diff + max_delay; 339 } 340 341 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) { 342 return std::make_unique<ReceiveStatisticsLocked>( 343 clock, [](uint32_t ssrc, Clock* clock) { 344 return std::make_unique<StreamStatisticianLocked>(ssrc, clock); 345 }); 346 } 347 348 std::unique_ptr<ReceiveStatistics> ReceiveStatistics::CreateThreadCompatible( 349 Clock* clock) { 350 return std::make_unique<ReceiveStatisticsImpl>( 351 clock, [](uint32_t ssrc, Clock* clock) { 352 return std::make_unique<StreamStatisticianImpl>(ssrc, clock); 353 }); 354 } 355 356 ReceiveStatisticsImpl::ReceiveStatisticsImpl( 357 Clock* clock, 358 std::function<std::unique_ptr<StreamStatisticianImplInterface>( 359 uint32_t ssrc, 360 Clock* clock)> stream_statistician_factory) 361 : clock_(clock), 362 stream_statistician_factory_(std::move(stream_statistician_factory)), 363 last_returned_ssrc_idx_(0) {} 364 365 void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) { 366 // StreamStatisticianImpl instance is created once and only destroyed when 367 // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has 368 // it's own locking so don't hold receive_statistics_lock_ (potential 369 // deadlock). 370 GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet); 371 } 372 373 StreamStatistician* ReceiveStatisticsImpl::GetStatistician( 374 uint32_t ssrc) const { 375 const auto& it = statisticians_.find(ssrc); 376 if (it == statisticians_.end()) 377 return nullptr; 378 return it->second.get(); 379 } 380 381 StreamStatisticianImplInterface* ReceiveStatisticsImpl::GetOrCreateStatistician( 382 uint32_t ssrc) { 383 std::unique_ptr<StreamStatisticianImplInterface>& impl = statisticians_[ssrc]; 384 if (impl == nullptr) { // new element 385 impl = stream_statistician_factory_(ssrc, clock_); 386 all_ssrcs_.push_back(ssrc); 387 } 388 return impl.get(); 389 } 390 391 void ReceiveStatisticsImpl::SetMaxReorderingThreshold( 392 uint32_t ssrc, 393 int max_reordering_threshold) { 394 GetOrCreateStatistician(ssrc)->SetMaxReorderingThreshold( 395 max_reordering_threshold); 396 } 397 398 void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc, 399 bool enable) { 400 GetOrCreateStatistician(ssrc)->EnableRetransmitDetection(enable); 401 } 402 403 std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks( 404 size_t max_blocks) { 405 std::vector<rtcp::ReportBlock> result; 406 result.reserve(std::min(max_blocks, all_ssrcs_.size())); 407 408 size_t ssrc_idx = 0; 409 for (size_t i = 0; i < all_ssrcs_.size() && result.size() < max_blocks; ++i) { 410 ssrc_idx = (last_returned_ssrc_idx_ + i + 1) % all_ssrcs_.size(); 411 const uint32_t media_ssrc = all_ssrcs_[ssrc_idx]; 412 auto statistician_it = statisticians_.find(media_ssrc); 413 RTC_DCHECK(statistician_it != statisticians_.end()); 414 statistician_it->second->MaybeAppendReportBlockAndReset(result); 415 } 416 last_returned_ssrc_idx_ = ssrc_idx; 417 return result; 418 } 419 420 } // namespace webrtc