fake_network_pipe.cc (13300B)
1 /* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "call/fake_network_pipe.h" 12 13 #include <algorithm> 14 #include <cstdint> 15 #include <cstring> 16 #include <memory> 17 #include <optional> 18 #include <queue> 19 #include <utility> 20 #include <vector> 21 22 #include "api/array_view.h" 23 #include "api/call/transport.h" 24 #include "api/media_types.h" 25 #include "api/test/simulated_network.h" 26 #include "api/units/timestamp.h" 27 #include "modules/rtp_rtcp/source/rtp_packet_received.h" 28 #include "rtc_base/checks.h" 29 #include "rtc_base/copy_on_write_buffer.h" 30 #include "rtc_base/logging.h" 31 #include "rtc_base/synchronization/mutex.h" 32 #include "system_wrappers/include/clock.h" 33 34 namespace webrtc { 35 36 namespace { 37 constexpr int64_t kLogIntervalMs = 5000; 38 } // namespace 39 40 NetworkPacket::NetworkPacket(CopyOnWriteBuffer packet, 41 int64_t send_time, 42 int64_t arrival_time, 43 std::optional<PacketOptions> packet_options, 44 bool is_rtcp, 45 MediaType media_type, 46 std::optional<int64_t> packet_time_us, 47 Transport* transport) 48 : packet_(std::move(packet)), 49 send_time_(send_time), 50 arrival_time_(arrival_time), 51 packet_options_(packet_options), 52 is_rtcp_(is_rtcp), 53 media_type_(media_type), 54 packet_time_us_(packet_time_us), 55 transport_(transport) {} 56 57 NetworkPacket::NetworkPacket(RtpPacketReceived packet_received, 58 MediaType media_type, 59 int64_t send_time, 60 int64_t arrival_time) 61 : packet_(packet_received.Buffer()), 62 send_time_(send_time), 63 arrival_time_(arrival_time), 64 is_rtcp_(false), 65 media_type_(media_type), 66 packet_time_us_(packet_received.arrival_time().us()), 67 packet_received_(std::move(packet_received)), 68 transport_(nullptr) {} 69 70 NetworkPacket::NetworkPacket(NetworkPacket&& o) 71 : packet_(std::move(o.packet_)), 72 send_time_(o.send_time_), 73 arrival_time_(o.arrival_time_), 74 packet_options_(o.packet_options_), 75 is_rtcp_(o.is_rtcp_), 76 media_type_(o.media_type_), 77 packet_time_us_(o.packet_time_us_), 78 packet_received_(std::move(o.packet_received_)), 79 transport_(o.transport_) {} 80 81 NetworkPacket::~NetworkPacket() = default; 82 83 NetworkPacket& NetworkPacket::operator=(NetworkPacket&& o) { 84 packet_ = std::move(o.packet_); 85 send_time_ = o.send_time_; 86 arrival_time_ = o.arrival_time_; 87 packet_options_ = o.packet_options_; 88 is_rtcp_ = o.is_rtcp_; 89 media_type_ = o.media_type_; 90 packet_time_us_ = o.packet_time_us_; 91 packet_received_ = o.packet_received_; 92 transport_ = o.transport_; 93 94 return *this; 95 } 96 97 FakeNetworkPipe::FakeNetworkPipe( 98 Clock* clock, 99 std::unique_ptr<NetworkBehaviorInterface> network_behavior) 100 : FakeNetworkPipe(clock, std::move(network_behavior), nullptr, 1) {} 101 102 FakeNetworkPipe::FakeNetworkPipe( 103 Clock* clock, 104 std::unique_ptr<NetworkBehaviorInterface> network_behavior, 105 PacketReceiver* receiver) 106 : FakeNetworkPipe(clock, std::move(network_behavior), receiver, 1) {} 107 108 FakeNetworkPipe::FakeNetworkPipe( 109 Clock* clock, 110 std::unique_ptr<NetworkBehaviorInterface> network_behavior, 111 PacketReceiver* receiver, 112 uint64_t /* seed */) 113 : clock_(clock), 114 network_behavior_(std::move(network_behavior)), 115 receiver_(receiver), 116 clock_offset_ms_(0), 117 dropped_packets_(0), 118 sent_packets_(0), 119 total_packet_delay_us_(0), 120 last_log_time_us_(clock_->TimeInMicroseconds()) {} 121 122 FakeNetworkPipe::~FakeNetworkPipe() { 123 RTC_DCHECK(active_transports_.empty()); 124 } 125 126 void FakeNetworkPipe::SetReceiver(PacketReceiver* receiver) { 127 MutexLock lock(&config_lock_); 128 receiver_ = receiver; 129 } 130 131 void FakeNetworkPipe::AddActiveTransport(Transport* transport) { 132 MutexLock lock(&config_lock_); 133 active_transports_[transport]++; 134 } 135 136 void FakeNetworkPipe::RemoveActiveTransport(Transport* transport) { 137 MutexLock lock(&config_lock_); 138 auto it = active_transports_.find(transport); 139 RTC_CHECK(it != active_transports_.end()); 140 if (--(it->second) == 0) { 141 active_transports_.erase(it); 142 } 143 } 144 145 bool FakeNetworkPipe::SendRtp(ArrayView<const uint8_t> packet, 146 const PacketOptions& options, 147 Transport* transport) { 148 RTC_DCHECK(transport); 149 EnqueuePacket(CopyOnWriteBuffer(packet), options, false, transport); 150 return true; 151 } 152 153 bool FakeNetworkPipe::SendRtcp(ArrayView<const uint8_t> packet, 154 Transport* transport) { 155 RTC_DCHECK(transport); 156 EnqueuePacket(CopyOnWriteBuffer(packet), std::nullopt, true, transport); 157 return true; 158 } 159 160 void FakeNetworkPipe::DeliverRtpPacket( 161 MediaType media_type, 162 RtpPacketReceived packet, 163 OnUndemuxablePacketHandler /* undemuxable_packet_handler */) { 164 MutexLock lock(&process_lock_); 165 int64_t time_now_us = clock_->TimeInMicroseconds(); 166 EnqueuePacket( 167 NetworkPacket(std::move(packet), media_type, time_now_us, time_now_us)); 168 } 169 170 void FakeNetworkPipe::DeliverRtcpPacket(CopyOnWriteBuffer packet) { 171 EnqueuePacket(std::move(packet), std::nullopt, true, MediaType::ANY, 172 std::nullopt); 173 } 174 175 void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) { 176 MutexLock lock(&config_lock_); 177 clock_offset_ms_ = offset_ms; 178 } 179 180 FakeNetworkPipe::StoredPacket::StoredPacket(NetworkPacket&& packet) 181 : packet(std::move(packet)) {} 182 183 bool FakeNetworkPipe::EnqueuePacket(CopyOnWriteBuffer packet, 184 std::optional<PacketOptions> options, 185 bool is_rtcp, 186 MediaType media_type, 187 std::optional<int64_t> packet_time_us) { 188 MutexLock lock(&process_lock_); 189 int64_t time_now_us = clock_->TimeInMicroseconds(); 190 return EnqueuePacket(NetworkPacket(std::move(packet), time_now_us, 191 time_now_us, options, is_rtcp, media_type, 192 packet_time_us, nullptr)); 193 } 194 195 bool FakeNetworkPipe::EnqueuePacket(CopyOnWriteBuffer packet, 196 std::optional<PacketOptions> options, 197 bool is_rtcp, 198 Transport* transport) { 199 MutexLock lock(&process_lock_); 200 int64_t time_now_us = clock_->TimeInMicroseconds(); 201 return EnqueuePacket(NetworkPacket(std::move(packet), time_now_us, 202 time_now_us, options, is_rtcp, 203 MediaType::ANY, std::nullopt, transport)); 204 } 205 206 bool FakeNetworkPipe::EnqueuePacket(NetworkPacket&& net_packet) { 207 int64_t send_time_us = net_packet.send_time(); 208 size_t packet_size = net_packet.data_length(); 209 210 packets_in_flight_.emplace_back(StoredPacket(std::move(net_packet))); 211 int64_t packet_id = reinterpret_cast<uint64_t>(&packets_in_flight_.back()); 212 bool sent = network_behavior_->EnqueuePacket( 213 PacketInFlightInfo(packet_size, send_time_us, packet_id)); 214 215 if (!sent) { 216 packets_in_flight_.pop_back(); 217 ++dropped_packets_; 218 } 219 return sent; 220 } 221 222 float FakeNetworkPipe::PercentageLoss() { 223 MutexLock lock(&process_lock_); 224 if (sent_packets_ == 0) 225 return 0; 226 227 return static_cast<float>(dropped_packets_) / 228 (sent_packets_ + dropped_packets_); 229 } 230 231 int FakeNetworkPipe::AverageDelay() { 232 MutexLock lock(&process_lock_); 233 if (sent_packets_ == 0) 234 return 0; 235 236 return static_cast<int>(total_packet_delay_us_ / 237 (1000 * static_cast<int64_t>(sent_packets_))); 238 } 239 240 size_t FakeNetworkPipe::DroppedPackets() { 241 MutexLock lock(&process_lock_); 242 return dropped_packets_; 243 } 244 245 size_t FakeNetworkPipe::SentPackets() { 246 MutexLock lock(&process_lock_); 247 return sent_packets_; 248 } 249 250 void FakeNetworkPipe::Process() { 251 int64_t time_now_us; 252 std::queue<NetworkPacket> packets_to_deliver; 253 { 254 MutexLock lock(&process_lock_); 255 time_now_us = clock_->TimeInMicroseconds(); 256 if (time_now_us - last_log_time_us_ > kLogIntervalMs * 1000) { 257 int64_t queueing_delay_us = 0; 258 if (!packets_in_flight_.empty()) 259 queueing_delay_us = 260 time_now_us - packets_in_flight_.front().packet.send_time(); 261 262 RTC_LOG(LS_INFO) << "Network queue: " << queueing_delay_us / 1000 263 << " ms."; 264 last_log_time_us_ = time_now_us; 265 } 266 267 std::vector<PacketDeliveryInfo> delivery_infos = 268 network_behavior_->DequeueDeliverablePackets(time_now_us); 269 for (auto& delivery_info : delivery_infos) { 270 // In the common case where no reordering happens, find will return early 271 // as the first packet will be a match. 272 auto packet_it = 273 std::find_if(packets_in_flight_.begin(), packets_in_flight_.end(), 274 [&delivery_info](StoredPacket& packet_ref) { 275 return reinterpret_cast<uint64_t>(&packet_ref) == 276 delivery_info.packet_id; 277 }); 278 // Check that the packet is in the deque of packets in flight. 279 RTC_CHECK(packet_it != packets_in_flight_.end()); 280 // Check that the packet is not already removed. 281 RTC_DCHECK(!packet_it->removed); 282 283 NetworkPacket packet = std::move(packet_it->packet); 284 packet_it->removed = true; 285 286 // Cleanup of removed packets at the beginning of the deque. 287 while (!packets_in_flight_.empty() && 288 packets_in_flight_.front().removed) { 289 packets_in_flight_.pop_front(); 290 } 291 292 if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) { 293 int64_t added_delay_us = 294 delivery_info.receive_time_us - packet.send_time(); 295 packet.IncrementArrivalTime(added_delay_us); 296 packets_to_deliver.emplace(std::move(packet)); 297 // `time_now_us` might be later than when the packet should have 298 // arrived, due to NetworkProcess being called too late. For stats, use 299 // the time it should have been on the link. 300 total_packet_delay_us_ += added_delay_us; 301 ++sent_packets_; 302 } else { 303 ++dropped_packets_; 304 } 305 } 306 } 307 308 MutexLock lock(&config_lock_); 309 while (!packets_to_deliver.empty()) { 310 NetworkPacket packet = std::move(packets_to_deliver.front()); 311 packets_to_deliver.pop(); 312 DeliverNetworkPacket(&packet); 313 } 314 } 315 316 void FakeNetworkPipe::DeliverNetworkPacket(NetworkPacket* packet) { 317 Transport* transport = packet->transport(); 318 if (transport) { 319 RTC_DCHECK(!receiver_); 320 if (active_transports_.find(transport) == active_transports_.end()) { 321 // Transport has been destroyed, ignore this packet. 322 return; 323 } 324 if (packet->is_rtcp()) { 325 transport->SendRtcp(MakeArrayView(packet->data(), packet->data_length()), 326 packet->packet_options()); 327 } else { 328 transport->SendRtp(MakeArrayView(packet->data(), packet->data_length()), 329 packet->packet_options()); 330 } 331 } else if (receiver_) { 332 int64_t packet_time_us = packet->packet_time_us().value_or(-1); 333 if (packet_time_us != -1) { 334 int64_t queue_time_us = packet->arrival_time() - packet->send_time(); 335 RTC_CHECK(queue_time_us >= 0); 336 packet_time_us += queue_time_us; 337 packet_time_us += (clock_offset_ms_ * 1000); 338 } 339 if (packet->is_rtcp()) { 340 receiver_->DeliverRtcpPacket(std::move(*packet->raw_packet())); 341 } else if (packet->packet_received()) { 342 packet->packet_received()->set_arrival_time( 343 Timestamp::Micros(packet_time_us)); 344 receiver_->DeliverRtpPacket( 345 packet->media_type(), *packet->packet_received(), 346 [](const RtpPacketReceived& packet) { 347 RTC_LOG(LS_WARNING) 348 << "Unexpected failed demuxing packet in FakeNetworkPipe, " 349 "Ssrc: " 350 << packet.Ssrc() << " seq : " << packet.SequenceNumber(); 351 return false; 352 }); 353 } 354 } 355 } 356 357 std::optional<int64_t> FakeNetworkPipe::TimeUntilNextProcess() { 358 MutexLock lock(&process_lock_); 359 std::optional<int64_t> delivery_us = network_behavior_->NextDeliveryTimeUs(); 360 if (delivery_us) { 361 int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds(); 362 return std::max<int64_t>((delay_us + 500) / 1000, 0); 363 } 364 return std::nullopt; 365 } 366 367 bool FakeNetworkPipe::HasReceiver() const { 368 MutexLock lock(&config_lock_); 369 return receiver_ != nullptr; 370 } 371 372 void FakeNetworkPipe::DeliverPacketWithLock(NetworkPacket* packet) { 373 MutexLock lock(&config_lock_); 374 DeliverNetworkPacket(packet); 375 } 376 377 void FakeNetworkPipe::ResetStats() { 378 MutexLock lock(&process_lock_); 379 dropped_packets_ = 0; 380 sent_packets_ = 0; 381 total_packet_delay_us_ = 0; 382 } 383 384 int64_t FakeNetworkPipe::GetTimeInMicroseconds() const { 385 return clock_->TimeInMicroseconds(); 386 } 387 388 } // namespace webrtc