call.cc (59017B)
1 /* 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "call/call.h" 12 13 #include <algorithm> 14 #include <atomic> 15 #include <cstdint> 16 #include <cstring> 17 #include <map> 18 #include <memory> 19 #include <optional> 20 #include <set> 21 #include <string> 22 #include <utility> 23 #include <vector> 24 25 #include "absl/functional/bind_front.h" 26 #include "absl/strings/string_view.h" 27 #include "api/adaptation/resource.h" 28 #include "api/array_view.h" 29 #include "api/environment/environment.h" 30 #include "api/fec_controller.h" 31 #include "api/media_types.h" 32 #include "api/rtc_error.h" 33 #include "api/rtc_event_log/rtc_event_log.h" 34 #include "api/rtp_headers.h" 35 #include "api/rtp_parameters.h" 36 #include "api/scoped_refptr.h" 37 #include "api/sequence_checker.h" 38 #include "api/task_queue/pending_task_safety_flag.h" 39 #include "api/task_queue/task_queue_base.h" 40 #include "api/transport/bitrate_settings.h" 41 #include "api/transport/network_control.h" 42 #include "api/transport/network_types.h" 43 #include "api/units/data_rate.h" 44 #include "api/units/data_size.h" 45 #include "api/units/time_delta.h" 46 #include "api/units/timestamp.h" 47 #include "audio/audio_receive_stream.h" 48 #include "audio/audio_send_stream.h" 49 #include "audio/audio_state.h" 50 #include "call/adaptation/broadcast_resource_listener.h" 51 #include "call/bitrate_allocator.h" 52 #include "call/call_config.h" 53 #include "call/flexfec_receive_stream.h" 54 #include "call/flexfec_receive_stream_impl.h" 55 #include "call/packet_receiver.h" 56 #include "call/payload_type.h" 57 #include "call/payload_type_picker.h" 58 #include "call/receive_stream.h" 59 #include "call/receive_time_calculator.h" 60 #include "call/rtp_config.h" 61 #include "call/rtp_stream_receiver_controller.h" 62 #include "call/rtp_transport_controller_send.h" 63 #include "call/version.h" 64 #include "call/video_receive_stream.h" 65 #include "call/video_send_stream.h" 66 #include "logging/rtc_event_log/events/rtc_event_audio_receive_stream_config.h" 67 #include "logging/rtc_event_log/events/rtc_event_rtcp_packet_incoming.h" 68 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_incoming.h" 69 #include "logging/rtc_event_log/events/rtc_event_video_receive_stream_config.h" 70 #include "logging/rtc_event_log/events/rtc_event_video_send_stream_config.h" 71 #include "logging/rtc_event_log/rtc_stream_config.h" 72 #include "media/base/codec.h" 73 #include "modules/congestion_controller/include/receive_side_congestion_controller.h" 74 #include "modules/rtp_rtcp/include/flexfec_receiver.h" 75 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" 76 #include "modules/rtp_rtcp/source/rtp_header_extensions.h" 77 #include "modules/rtp_rtcp/source/rtp_packet_received.h" 78 #include "modules/rtp_rtcp/source/rtp_util.h" 79 #include "modules/video_coding/fec_controller_default.h" 80 #include "modules/video_coding/nack_requester.h" 81 #include "rtc_base/checks.h" 82 #include "rtc_base/copy_on_write_buffer.h" 83 #include "rtc_base/cpu_info.h" 84 #include "rtc_base/logging.h" 85 #include "rtc_base/network/sent_packet.h" 86 #include "rtc_base/strings/string_builder.h" 87 #include "rtc_base/system/no_unique_address.h" 88 #include "rtc_base/task_utils/repeating_task.h" 89 #include "rtc_base/thread.h" 90 #include "rtc_base/thread_annotations.h" 91 #include "rtc_base/time_utils.h" 92 #include "rtc_base/trace_event.h" 93 #include "system_wrappers/include/clock.h" 94 #include "system_wrappers/include/metrics.h" 95 #include "video/call_stats2.h" 96 #include "video/config/video_encoder_config.h" 97 #include "video/decode_synchronizer.h" 98 #include "video/send_delay_stats.h" 99 #include "video/stats_counter.h" 100 #include "video/video_receive_stream2.h" 101 #include "video/video_send_stream_impl.h" 102 103 namespace webrtc { 104 105 namespace { 106 107 // In normal operation, the PTS comes from the PeerConnection. 108 // However, it is too much of a bother to insert it in all tests, 109 // so defaulting here. 110 class PayloadTypeSuggesterForTests : public PayloadTypeSuggester { 111 public: 112 PayloadTypeSuggesterForTests() = default; 113 RTCErrorOr<PayloadType> SuggestPayloadType(const std::string& /* mid */, 114 Codec codec) override { 115 return payload_type_picker_.SuggestMapping(codec, nullptr); 116 } 117 RTCError AddLocalMapping(const std::string& /* mid */, 118 PayloadType /* payload_type */, 119 const Codec& /* codec */) override { 120 return RTCError::OK(); 121 } 122 123 private: 124 PayloadTypePicker payload_type_picker_; 125 }; 126 127 const int* FindKeyByValue(const std::map<int, int>& m, int v) { 128 for (const auto& kv : m) { 129 if (kv.second == v) 130 return &kv.first; 131 } 132 return nullptr; 133 } 134 135 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig( 136 const VideoReceiveStreamInterface::Config& config) { 137 auto rtclog_config = std::make_unique<rtclog::StreamConfig>(); 138 rtclog_config->remote_ssrc = config.rtp.remote_ssrc; 139 rtclog_config->local_ssrc = config.rtp.local_ssrc; 140 rtclog_config->rtx_ssrc = config.rtp.rtx_ssrc; 141 rtclog_config->rtcp_mode = config.rtp.rtcp_mode; 142 143 for (const auto& d : config.decoders) { 144 const int* search = 145 FindKeyByValue(config.rtp.rtx_associated_payload_types, d.payload_type); 146 rtclog_config->codecs.emplace_back(d.video_format.name, d.payload_type, 147 search ? *search : 0); 148 } 149 return rtclog_config; 150 } 151 152 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig( 153 const VideoSendStream::Config& config, 154 size_t ssrc_index) { 155 auto rtclog_config = std::make_unique<rtclog::StreamConfig>(); 156 rtclog_config->local_ssrc = config.rtp.ssrcs[ssrc_index]; 157 if (ssrc_index < config.rtp.rtx.ssrcs.size()) { 158 rtclog_config->rtx_ssrc = config.rtp.rtx.ssrcs[ssrc_index]; 159 } 160 rtclog_config->rtcp_mode = config.rtp.rtcp_mode; 161 rtclog_config->rtp_extensions = config.rtp.extensions; 162 163 rtclog_config->codecs.emplace_back(config.rtp.payload_name, 164 config.rtp.payload_type, 165 config.rtp.rtx.payload_type); 166 return rtclog_config; 167 } 168 169 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig( 170 const AudioReceiveStreamInterface::Config& config) { 171 auto rtclog_config = std::make_unique<rtclog::StreamConfig>(); 172 rtclog_config->remote_ssrc = config.rtp.remote_ssrc; 173 rtclog_config->local_ssrc = config.rtp.local_ssrc; 174 return rtclog_config; 175 } 176 177 TaskQueueBase* GetCurrentTaskQueueOrThread() { 178 TaskQueueBase* current = TaskQueueBase::Current(); 179 if (!current) 180 current = ThreadManager::Instance()->CurrentThread(); 181 return current; 182 } 183 184 } // namespace 185 186 namespace internal { 187 188 // Wraps an injected resource in a BroadcastResourceListener and handles adding 189 // and removing adapter resources to individual VideoSendStreams. 190 class ResourceVideoSendStreamForwarder { 191 public: 192 explicit ResourceVideoSendStreamForwarder( 193 scoped_refptr<webrtc::Resource> resource) 194 : broadcast_resource_listener_(resource) { 195 broadcast_resource_listener_.StartListening(); 196 } 197 ~ResourceVideoSendStreamForwarder() { 198 RTC_DCHECK(adapter_resources_.empty()); 199 broadcast_resource_listener_.StopListening(); 200 } 201 202 scoped_refptr<webrtc::Resource> Resource() const { 203 return broadcast_resource_listener_.SourceResource(); 204 } 205 206 void OnCreateVideoSendStream(VideoSendStream* video_send_stream) { 207 RTC_DCHECK(adapter_resources_.find(video_send_stream) == 208 adapter_resources_.end()); 209 auto adapter_resource = 210 broadcast_resource_listener_.CreateAdapterResource(); 211 video_send_stream->AddAdaptationResource(adapter_resource); 212 adapter_resources_.insert( 213 std::make_pair(video_send_stream, adapter_resource)); 214 } 215 216 void OnDestroyVideoSendStream(VideoSendStream* video_send_stream) { 217 auto it = adapter_resources_.find(video_send_stream); 218 RTC_DCHECK(it != adapter_resources_.end()); 219 broadcast_resource_listener_.RemoveAdapterResource(it->second); 220 adapter_resources_.erase(it); 221 } 222 223 private: 224 BroadcastResourceListener broadcast_resource_listener_; 225 std::map<VideoSendStream*, scoped_refptr<webrtc::Resource>> 226 adapter_resources_; 227 }; 228 229 class Call final : public webrtc::Call, 230 public PacketReceiver, 231 public TargetTransferRateObserver, 232 public BitrateAllocator::LimitObserver { 233 public: 234 Call(CallConfig config, 235 std::unique_ptr<RtpTransportControllerSendInterface> transport_send); 236 ~Call() override; 237 238 Call(const Call&) = delete; 239 Call& operator=(const Call&) = delete; 240 241 // Implements webrtc::Call. 242 PacketReceiver* Receiver() override; 243 244 webrtc::AudioSendStream* CreateAudioSendStream( 245 const webrtc::AudioSendStream::Config& config) override; 246 void DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) override; 247 248 webrtc::AudioReceiveStreamInterface* CreateAudioReceiveStream( 249 const webrtc::AudioReceiveStreamInterface::Config& config) override; 250 void DestroyAudioReceiveStream( 251 webrtc::AudioReceiveStreamInterface* receive_stream) override; 252 253 webrtc::VideoSendStream* CreateVideoSendStream( 254 webrtc::VideoSendStream::Config config, 255 VideoEncoderConfig encoder_config) override; 256 webrtc::VideoSendStream* CreateVideoSendStream( 257 webrtc::VideoSendStream::Config config, 258 VideoEncoderConfig encoder_config, 259 std::unique_ptr<FecController> fec_controller) override; 260 void DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) override; 261 262 webrtc::VideoReceiveStreamInterface* CreateVideoReceiveStream( 263 webrtc::VideoReceiveStreamInterface::Config configuration) override; 264 void DestroyVideoReceiveStream( 265 webrtc::VideoReceiveStreamInterface* receive_stream) override; 266 267 FlexfecReceiveStream* CreateFlexfecReceiveStream( 268 const FlexfecReceiveStream::Config config) override; 269 void DestroyFlexfecReceiveStream( 270 FlexfecReceiveStream* receive_stream) override; 271 272 void AddAdaptationResource(scoped_refptr<Resource> resource) override; 273 274 RtpTransportControllerSendInterface* GetTransportControllerSend() override; 275 276 PayloadTypeSuggester* GetPayloadTypeSuggester() override; 277 void SetPayloadTypeSuggester(PayloadTypeSuggester* suggester) override; 278 279 Stats GetStats() const override; 280 281 void SetPreferredRtcpCcAckType( 282 RtcpFeedbackType preferred_rtcp_cc_ack_type) override; 283 std::optional<int> FeedbackAccordingToRfc8888Count() override; 284 std::optional<int> FeedbackAccordingToTransportCcCount() override; 285 286 TaskQueueBase* network_thread() const override; 287 TaskQueueBase* worker_thread() const override; 288 289 void DeliverRtcpPacket(CopyOnWriteBuffer packet) override; 290 291 void DeliverRtpPacket( 292 MediaType media_type, 293 RtpPacketReceived packet, 294 OnUndemuxablePacketHandler undemuxable_packet_handler) override; 295 296 void SignalChannelNetworkState(MediaType media, NetworkState state) override; 297 298 void OnAudioTransportOverheadChanged( 299 int transport_overhead_per_packet) override; 300 301 void OnLocalSsrcUpdated(webrtc::AudioReceiveStreamInterface& stream, 302 uint32_t local_ssrc) override; 303 void OnLocalSsrcUpdated(VideoReceiveStreamInterface& stream, 304 uint32_t local_ssrc) override; 305 void OnLocalSsrcUpdated(FlexfecReceiveStream& stream, 306 uint32_t local_ssrc) override; 307 308 void OnUpdateSyncGroup(webrtc::AudioReceiveStreamInterface& stream, 309 absl::string_view sync_group) override; 310 311 void OnSentPacket(const SentPacketInfo& sent_packet) override; 312 313 // Implements TargetTransferRateObserver, 314 void OnTargetTransferRate(TargetTransferRate msg) override; 315 void OnStartRateUpdate(DataRate start_rate) override; 316 317 // Implements BitrateAllocator::LimitObserver. 318 void OnAllocationLimitsChanged(BitrateAllocationLimits limits) override; 319 320 void SetClientBitratePreferences(const BitrateSettings& preferences) override; 321 322 private: 323 // Thread-compatible class that collects received packet stats and exposes 324 // them as UMA histograms on destruction. 325 class ReceiveStats { 326 public: 327 explicit ReceiveStats(Clock* clock); 328 ~ReceiveStats(); 329 330 void AddReceivedRtcpBytes(int bytes); 331 void AddReceivedAudioBytes(int bytes, webrtc::Timestamp arrival_time); 332 void AddReceivedVideoBytes(int bytes, webrtc::Timestamp arrival_time); 333 334 private: 335 RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; 336 RateCounter received_bytes_per_second_counter_ 337 RTC_GUARDED_BY(sequence_checker_); 338 RateCounter received_audio_bytes_per_second_counter_ 339 RTC_GUARDED_BY(sequence_checker_); 340 RateCounter received_video_bytes_per_second_counter_ 341 RTC_GUARDED_BY(sequence_checker_); 342 RateCounter received_rtcp_bytes_per_second_counter_ 343 RTC_GUARDED_BY(sequence_checker_); 344 std::optional<Timestamp> first_received_rtp_audio_timestamp_ 345 RTC_GUARDED_BY(sequence_checker_); 346 std::optional<Timestamp> last_received_rtp_audio_timestamp_ 347 RTC_GUARDED_BY(sequence_checker_); 348 std::optional<Timestamp> first_received_rtp_video_timestamp_ 349 RTC_GUARDED_BY(sequence_checker_); 350 std::optional<Timestamp> last_received_rtp_video_timestamp_ 351 RTC_GUARDED_BY(sequence_checker_); 352 }; 353 354 // Thread-compatible class that collects sent packet stats and exposes 355 // them as UMA histograms on destruction, provided SetFirstPacketTime was 356 // called with a non-empty packet timestamp before the destructor. 357 class SendStats { 358 public: 359 explicit SendStats(Clock* clock); 360 ~SendStats(); 361 362 void SetFirstPacketTime(std::optional<Timestamp> first_sent_packet_time); 363 void PauseSendAndPacerBitrateCounters(); 364 void AddTargetBitrateSample(uint32_t target_bitrate_bps); 365 void SetMinAllocatableRate(BitrateAllocationLimits limits); 366 367 private: 368 RTC_NO_UNIQUE_ADDRESS SequenceChecker destructor_sequence_checker_; 369 RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; 370 Clock* const clock_ RTC_GUARDED_BY(destructor_sequence_checker_); 371 AvgCounter estimated_send_bitrate_kbps_counter_ 372 RTC_GUARDED_BY(sequence_checker_); 373 AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(sequence_checker_); 374 uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(sequence_checker_){ 375 0}; 376 std::optional<Timestamp> first_sent_packet_time_ 377 RTC_GUARDED_BY(destructor_sequence_checker_); 378 }; 379 380 void DeliverRtcp(MediaType media_type, CopyOnWriteBuffer packet) 381 RTC_RUN_ON(network_thread_); 382 383 AudioReceiveStreamImpl* FindAudioStreamForSyncGroup( 384 absl::string_view sync_group) RTC_RUN_ON(worker_thread_); 385 void ConfigureSync(absl::string_view sync_group) RTC_RUN_ON(worker_thread_); 386 387 void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, 388 MediaType media_type) 389 RTC_RUN_ON(worker_thread_); 390 391 bool RegisterReceiveStream(uint32_t ssrc, ReceiveStreamInterface* stream); 392 bool UnregisterReceiveStream(uint32_t ssrc); 393 394 void UpdateAggregateNetworkState(); 395 396 // Ensure that necessary process threads are started, and any required 397 // callbacks have been registered. 398 void EnsureStarted() RTC_RUN_ON(worker_thread_); 399 400 const Environment env_; 401 TaskQueueBase* const worker_thread_; 402 TaskQueueBase* const network_thread_; 403 const std::unique_ptr<DecodeSynchronizer> decode_sync_; 404 RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_; 405 406 const int num_cpu_cores_; 407 const std::unique_ptr<CallStats> call_stats_; 408 const std::unique_ptr<BitrateAllocator> bitrate_allocator_; 409 const CallConfig config_ RTC_GUARDED_BY(worker_thread_); 410 411 NetworkState audio_network_state_ RTC_GUARDED_BY(worker_thread_); 412 NetworkState video_network_state_ RTC_GUARDED_BY(worker_thread_); 413 // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the 414 // network thread. 415 bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); 416 417 // Schedules nack periodic processing on behalf of all streams. 418 NackPeriodicProcessor nack_periodic_processor_; 419 420 // Audio, Video, and FlexFEC receive streams are owned by the client that 421 // creates them. 422 // TODO(bugs.webrtc.org/11993): Move audio_receive_streams_, 423 // video_receive_streams_ over to the network thread. 424 std::set<AudioReceiveStreamImpl*> audio_receive_streams_ 425 RTC_GUARDED_BY(worker_thread_); 426 std::set<VideoReceiveStream2*> video_receive_streams_ 427 RTC_GUARDED_BY(worker_thread_); 428 // TODO(bugs.webrtc.org/7135, bugs.webrtc.org/9719): Should eventually be 429 // injected at creation, with a single object in the bundled case. 430 RtpStreamReceiverController audio_receiver_controller_ 431 RTC_GUARDED_BY(worker_thread_); 432 RtpStreamReceiverController video_receiver_controller_ 433 RTC_GUARDED_BY(worker_thread_); 434 435 // This extra map is used for receive processing which is 436 // independent of media type. 437 438 RTC_NO_UNIQUE_ADDRESS SequenceChecker receive_11993_checker_; 439 440 // Audio and Video send streams are owned by the client that creates them. 441 // TODO(bugs.webrtc.org/11993): `audio_send_ssrcs_` and `video_send_ssrcs_` 442 // should be accessed on the network thread. 443 std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_ 444 RTC_GUARDED_BY(worker_thread_); 445 std::map<uint32_t, VideoSendStreamImpl*> video_send_ssrcs_ 446 RTC_GUARDED_BY(worker_thread_); 447 std::set<VideoSendStreamImpl*> video_send_streams_ 448 RTC_GUARDED_BY(worker_thread_); 449 // True if `video_send_streams_` is empty, false if not. The atomic variable 450 // is used to decide UMA send statistics behavior and enables avoiding a 451 // PostTask(). 452 std::atomic<bool> video_send_streams_empty_{true}; 453 454 // Each forwarder wraps an adaptation resource that was added to the call. 455 std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>> 456 adaptation_resource_forwarders_ RTC_GUARDED_BY(worker_thread_); 457 458 using RtpStateMap = std::map<uint32_t, RtpState>; 459 RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_); 460 RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_); 461 462 using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>; 463 RtpPayloadStateMap suspended_video_payload_states_ 464 RTC_GUARDED_BY(worker_thread_); 465 466 // TODO(bugs.webrtc.org/11993) ready to move stats access to the network 467 // thread. 468 ReceiveStats receive_stats_ RTC_GUARDED_BY(worker_thread_); 469 SendStats send_stats_ RTC_GUARDED_BY(send_transport_sequence_checker_); 470 // `last_bandwidth_bps_` and `configured_max_padding_bitrate_bps_` being 471 // atomic avoids a PostTask. The variables are used for stats gathering. 472 std::atomic<uint32_t> last_bandwidth_bps_{0}; 473 std::atomic<uint32_t> configured_max_padding_bitrate_bps_{0}; 474 475 ReceiveSideCongestionController receive_side_cc_; 476 RepeatingTaskHandle receive_side_cc_periodic_task_; 477 RepeatingTaskHandle elastic_bandwidth_allocation_task_; 478 479 const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_; 480 481 const std::unique_ptr<SendDelayStats> video_send_delay_stats_; 482 const Timestamp start_of_call_; 483 484 // Note that `task_safety_` needs to be at a greater scope than the task queue 485 // owned by `transport_send_` since calls might arrive on the network thread 486 // while Call is being deleted and the task queue is being torn down. 487 const ScopedTaskSafety task_safety_; 488 489 // Caches transport_send_.get(), to avoid racing with destructor. 490 // Note that this is declared before transport_send_ to ensure that it is not 491 // invalidated until no more tasks can be running on the transport_send_ task 492 // queue. 493 // For more details on the background of this member variable, see: 494 // https://webrtc-review.googlesource.com/c/src/+/63023/9/call/call.cc 495 // https://bugs.chromium.org/p/chromium/issues/detail?id=992640 496 RtpTransportControllerSendInterface* const transport_send_ptr_ 497 RTC_GUARDED_BY(send_transport_sequence_checker_); 498 499 bool is_started_ RTC_GUARDED_BY(worker_thread_) = false; 500 501 // Mechanism for proposing payload types in RTP mappings. 502 PayloadTypeSuggester* pt_suggester_ = nullptr; 503 std::unique_ptr<PayloadTypeSuggesterForTests> owned_pt_suggester_; 504 505 // Sequence checker for outgoing network traffic. Could be the network thread. 506 // Could also be a pacer owned thread or TQ such as the TaskQueueSender. 507 RTC_NO_UNIQUE_ADDRESS SequenceChecker sent_packet_sequence_checker_; 508 std::optional<SentPacketInfo> last_sent_packet_ 509 RTC_GUARDED_BY(sent_packet_sequence_checker_); 510 // Declared last since it will issue callbacks from a task queue. Declaring it 511 // last ensures that it is destroyed first and any running tasks are finished. 512 const std::unique_ptr<RtpTransportControllerSendInterface> transport_send_; 513 }; 514 } // namespace internal 515 516 std::unique_ptr<Call> Call::Create(CallConfig config) { 517 auto transport_send = std::make_unique<RtpTransportControllerSend>( 518 config.ExtractTransportConfig()); 519 520 return std::make_unique<internal::Call>(std::move(config), 521 std::move(transport_send)); 522 } 523 524 // This method here to avoid subclasses has to implement this method. 525 // Call perf test will use Internal::Call::CreateVideoSendStream() to inject 526 // FecController. 527 VideoSendStream* Call::CreateVideoSendStream( 528 VideoSendStream::Config /* config */, 529 VideoEncoderConfig /* encoder_config */, 530 std::unique_ptr<FecController> /* fec_controller */) { 531 return nullptr; 532 } 533 534 namespace internal { 535 536 Call::ReceiveStats::ReceiveStats(Clock* clock) 537 : received_bytes_per_second_counter_(clock, nullptr, false), 538 received_audio_bytes_per_second_counter_(clock, nullptr, false), 539 received_video_bytes_per_second_counter_(clock, nullptr, false), 540 received_rtcp_bytes_per_second_counter_(clock, nullptr, false) { 541 sequence_checker_.Detach(); 542 } 543 544 void Call::ReceiveStats::AddReceivedRtcpBytes(int bytes) { 545 RTC_DCHECK_RUN_ON(&sequence_checker_); 546 if (received_bytes_per_second_counter_.HasSample()) { 547 // First RTP packet has been received. 548 received_bytes_per_second_counter_.Add(static_cast<int>(bytes)); 549 received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(bytes)); 550 } 551 } 552 553 void Call::ReceiveStats::AddReceivedAudioBytes(int bytes, 554 webrtc::Timestamp arrival_time) { 555 RTC_DCHECK_RUN_ON(&sequence_checker_); 556 received_bytes_per_second_counter_.Add(bytes); 557 received_audio_bytes_per_second_counter_.Add(bytes); 558 if (!first_received_rtp_audio_timestamp_) 559 first_received_rtp_audio_timestamp_ = arrival_time; 560 last_received_rtp_audio_timestamp_ = arrival_time; 561 } 562 563 void Call::ReceiveStats::AddReceivedVideoBytes(int bytes, 564 webrtc::Timestamp arrival_time) { 565 RTC_DCHECK_RUN_ON(&sequence_checker_); 566 received_bytes_per_second_counter_.Add(bytes); 567 received_video_bytes_per_second_counter_.Add(bytes); 568 if (!first_received_rtp_video_timestamp_) 569 first_received_rtp_video_timestamp_ = arrival_time; 570 last_received_rtp_video_timestamp_ = arrival_time; 571 } 572 573 Call::ReceiveStats::~ReceiveStats() { 574 RTC_DCHECK_RUN_ON(&sequence_checker_); 575 if (first_received_rtp_audio_timestamp_) { 576 RTC_HISTOGRAM_COUNTS_100000( 577 "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds", 578 (*last_received_rtp_audio_timestamp_ - 579 *first_received_rtp_audio_timestamp_) 580 .seconds()); 581 } 582 if (first_received_rtp_video_timestamp_) { 583 RTC_HISTOGRAM_COUNTS_100000( 584 "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds", 585 (*last_received_rtp_video_timestamp_ - 586 *first_received_rtp_video_timestamp_) 587 .seconds()); 588 } 589 const int kMinRequiredPeriodicSamples = 5; 590 AggregatedStats video_bytes_per_sec = 591 received_video_bytes_per_second_counter_.GetStats(); 592 if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { 593 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps", 594 video_bytes_per_sec.average * 8 / 1000); 595 RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, " 596 << video_bytes_per_sec.ToStringWithMultiplier(8); 597 } 598 AggregatedStats audio_bytes_per_sec = 599 received_audio_bytes_per_second_counter_.GetStats(); 600 if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { 601 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps", 602 audio_bytes_per_sec.average * 8 / 1000); 603 RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, " 604 << audio_bytes_per_sec.ToStringWithMultiplier(8); 605 } 606 AggregatedStats rtcp_bytes_per_sec = 607 received_rtcp_bytes_per_second_counter_.GetStats(); 608 if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { 609 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps", 610 rtcp_bytes_per_sec.average * 8); 611 RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, " 612 << rtcp_bytes_per_sec.ToStringWithMultiplier(8); 613 } 614 AggregatedStats recv_bytes_per_sec = 615 received_bytes_per_second_counter_.GetStats(); 616 if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) { 617 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps", 618 recv_bytes_per_sec.average * 8 / 1000); 619 RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, " 620 << recv_bytes_per_sec.ToStringWithMultiplier(8); 621 } 622 } 623 624 Call::SendStats::SendStats(Clock* clock) 625 : clock_(clock), 626 estimated_send_bitrate_kbps_counter_(clock, nullptr, true), 627 pacer_bitrate_kbps_counter_(clock, nullptr, true) { 628 destructor_sequence_checker_.Detach(); 629 sequence_checker_.Detach(); 630 } 631 632 Call::SendStats::~SendStats() { 633 RTC_DCHECK_RUN_ON(&destructor_sequence_checker_); 634 if (!first_sent_packet_time_) 635 return; 636 637 TimeDelta elapsed = clock_->CurrentTime() - *first_sent_packet_time_; 638 if (elapsed < metrics::kMinRunTime) 639 return; 640 641 const int kMinRequiredPeriodicSamples = 5; 642 AggregatedStats send_bitrate_stats = 643 estimated_send_bitrate_kbps_counter_.ProcessAndGetStats(); 644 if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { 645 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps", 646 send_bitrate_stats.average); 647 RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, " 648 << send_bitrate_stats.ToString(); 649 } 650 AggregatedStats pacer_bitrate_stats = 651 pacer_bitrate_kbps_counter_.ProcessAndGetStats(); 652 if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) { 653 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps", 654 pacer_bitrate_stats.average); 655 RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, " 656 << pacer_bitrate_stats.ToString(); 657 } 658 } 659 660 void Call::SendStats::SetFirstPacketTime( 661 std::optional<Timestamp> first_sent_packet_time) { 662 RTC_DCHECK_RUN_ON(&destructor_sequence_checker_); 663 first_sent_packet_time_ = first_sent_packet_time; 664 } 665 666 void Call::SendStats::PauseSendAndPacerBitrateCounters() { 667 RTC_DCHECK_RUN_ON(&sequence_checker_); 668 estimated_send_bitrate_kbps_counter_.ProcessAndPause(); 669 pacer_bitrate_kbps_counter_.ProcessAndPause(); 670 } 671 672 void Call::SendStats::AddTargetBitrateSample(uint32_t target_bitrate_bps) { 673 RTC_DCHECK_RUN_ON(&sequence_checker_); 674 estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); 675 // Pacer bitrate may be higher than bitrate estimate if enforcing min 676 // bitrate. 677 uint32_t pacer_bitrate_bps = 678 std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); 679 pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); 680 } 681 682 void Call::SendStats::SetMinAllocatableRate(BitrateAllocationLimits limits) { 683 RTC_DCHECK_RUN_ON(&sequence_checker_); 684 min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); 685 } 686 687 Call::Call(CallConfig config, 688 std::unique_ptr<RtpTransportControllerSendInterface> transport_send) 689 : env_(config.env), 690 worker_thread_(GetCurrentTaskQueueOrThread()), 691 // If `network_task_queue_` was set to nullptr, network related calls 692 // must be made on `worker_thread_` (i.e. they're one and the same). 693 network_thread_(config.network_task_queue_ ? config.network_task_queue_ 694 : worker_thread_), 695 decode_sync_( 696 config.decode_metronome 697 ? std::make_unique<DecodeSynchronizer>(&env_.clock(), 698 config.decode_metronome, 699 worker_thread_) 700 : nullptr), 701 num_cpu_cores_(cpu_info::DetectNumberOfCores()), 702 call_stats_(new CallStats(&env_.clock(), worker_thread_)), 703 bitrate_allocator_(new BitrateAllocator( 704 this, 705 GetElasticRateAllocationFieldTrialParameter(env_.field_trials()))), 706 config_(std::move(config)), 707 audio_network_state_(kNetworkDown), 708 video_network_state_(kNetworkDown), 709 aggregate_network_up_(false), 710 receive_stats_(&env_.clock()), 711 send_stats_(&env_.clock()), 712 receive_side_cc_(env_, 713 absl::bind_front(&PacketRouter::SendCombinedRtcpPacket, 714 transport_send->packet_router()), 715 absl::bind_front(&PacketRouter::SendRemb, 716 transport_send->packet_router())), 717 receive_time_calculator_( 718 ReceiveTimeCalculator::CreateFromFieldTrial(env_.field_trials())), 719 video_send_delay_stats_(new SendDelayStats(&env_.clock())), 720 start_of_call_(env_.clock().CurrentTime()), 721 transport_send_ptr_(transport_send.get()), 722 transport_send_(std::move(transport_send)) { 723 RTC_DCHECK(network_thread_); 724 RTC_DCHECK(worker_thread_->IsCurrent()); 725 726 receive_11993_checker_.Detach(); 727 send_transport_sequence_checker_.Detach(); 728 sent_packet_sequence_checker_.Detach(); 729 730 // Do not remove this call; it is here to convince the compiler that the 731 // WebRTC source timestamp string needs to be in the final binary. 732 LoadWebRTCVersionInRegister(); 733 734 call_stats_->RegisterStatsObserver(&receive_side_cc_); 735 736 ReceiveSideCongestionController* receive_side_cc = &receive_side_cc_; 737 receive_side_cc_periodic_task_ = RepeatingTaskHandle::Start( 738 worker_thread_, 739 [receive_side_cc] { return receive_side_cc->MaybeProcess(); }, 740 TaskQueueBase::DelayPrecision::kLow, &env_.clock()); 741 742 // TODO(b/350555527): Remove after experiment 743 if (GetElasticRateAllocationFieldTrialParameter(env_.field_trials()) != 744 DataRate::Zero()) { 745 elastic_bandwidth_allocation_task_ = RepeatingTaskHandle::Start( 746 worker_thread_, 747 [this] { 748 TimeDelta next_schedule_interval = TimeDelta::Millis(25); 749 if (bitrate_allocator_) { 750 if (!bitrate_allocator_->RecomputeAllocationIfNeeded()) 751 next_schedule_interval = TimeDelta::Millis(300); 752 } 753 return next_schedule_interval; 754 }, 755 TaskQueueBase::DelayPrecision::kLow, &env_.clock()); 756 } 757 } 758 759 Call::~Call() { 760 RTC_DCHECK_RUN_ON(worker_thread_); 761 762 RTC_CHECK(audio_send_ssrcs_.empty()); 763 RTC_CHECK(video_send_ssrcs_.empty()); 764 RTC_CHECK(video_send_streams_.empty()); 765 RTC_CHECK(audio_receive_streams_.empty()); 766 RTC_CHECK(video_receive_streams_.empty()); 767 768 receive_side_cc_periodic_task_.Stop(); 769 elastic_bandwidth_allocation_task_.Stop(); 770 call_stats_->DeregisterStatsObserver(&receive_side_cc_); 771 send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime()); 772 773 RTC_HISTOGRAM_COUNTS_100000( 774 "WebRTC.Call.LifetimeInSeconds", 775 (env_.clock().CurrentTime() - start_of_call_).seconds()); 776 } 777 778 void Call::EnsureStarted() { 779 if (is_started_) { 780 return; 781 } 782 is_started_ = true; 783 784 call_stats_->EnsureStarted(); 785 786 // This call seems to kick off a number of things, so probably better left 787 // off being kicked off on request rather than in the ctor. 788 transport_send_->RegisterTargetTransferRateObserver(this); 789 790 transport_send_->EnsureStarted(); 791 } 792 793 void Call::SetClientBitratePreferences(const BitrateSettings& preferences) { 794 RTC_DCHECK_RUN_ON(worker_thread_); 795 GetTransportControllerSend()->SetClientBitratePreferences(preferences); 796 } 797 798 PacketReceiver* Call::Receiver() { 799 return this; 800 } 801 802 webrtc::AudioSendStream* Call::CreateAudioSendStream( 803 const webrtc::AudioSendStream::Config& config) { 804 TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream"); 805 RTC_DCHECK_RUN_ON(worker_thread_); 806 807 EnsureStarted(); 808 809 // Stream config is logged in AudioSendStream::ConfigureStream, as it may 810 // change during the stream's lifetime. 811 std::optional<RtpState> suspended_rtp_state; 812 { 813 const auto& iter = suspended_audio_send_ssrcs_.find(config.rtp.ssrc); 814 if (iter != suspended_audio_send_ssrcs_.end()) { 815 suspended_rtp_state.emplace(iter->second); 816 } 817 } 818 819 AudioSendStream* send_stream = 820 new AudioSendStream(env_, config, config_.audio_state, 821 transport_send_.get(), bitrate_allocator_.get(), 822 call_stats_->AsRtcpRttStats(), suspended_rtp_state); 823 RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == 824 audio_send_ssrcs_.end()); 825 audio_send_ssrcs_[config.rtp.ssrc] = send_stream; 826 827 UpdateAggregateNetworkState(); 828 829 return send_stream; 830 } 831 832 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { 833 TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream"); 834 RTC_DCHECK_RUN_ON(worker_thread_); 835 RTC_DCHECK(send_stream != nullptr); 836 837 send_stream->Stop(); 838 839 const uint32_t ssrc = send_stream->GetConfig().rtp.ssrc; 840 webrtc::internal::AudioSendStream* audio_send_stream = 841 static_cast<webrtc::internal::AudioSendStream*>(send_stream); 842 suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState(); 843 844 size_t num_deleted = audio_send_ssrcs_.erase(ssrc); 845 RTC_DCHECK_EQ(1, num_deleted); 846 847 UpdateAggregateNetworkState(); 848 849 delete send_stream; 850 } 851 852 webrtc::AudioReceiveStreamInterface* Call::CreateAudioReceiveStream( 853 const webrtc::AudioReceiveStreamInterface::Config& config) { 854 TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream"); 855 RTC_DCHECK_RUN_ON(worker_thread_); 856 EnsureStarted(); 857 env_.event_log().Log(std::make_unique<RtcEventAudioReceiveStreamConfig>( 858 CreateRtcLogStreamConfig(config))); 859 860 AudioReceiveStreamImpl* receive_stream = new AudioReceiveStreamImpl( 861 env_, transport_send_->packet_router(), config_.neteq_factory, config, 862 config_.audio_state); 863 audio_receive_streams_.insert(receive_stream); 864 865 // TODO(bugs.webrtc.org/11993): Make the registration on the network thread 866 // (asynchronously). The registration and `audio_receiver_controller_` need 867 // to live on the network thread. 868 receive_stream->RegisterWithTransport(&audio_receiver_controller_); 869 870 ConfigureSync(config.sync_group); 871 872 UpdateAggregateNetworkState(); 873 return receive_stream; 874 } 875 876 void Call::DestroyAudioReceiveStream( 877 webrtc::AudioReceiveStreamInterface* receive_stream) { 878 TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream"); 879 RTC_DCHECK_RUN_ON(worker_thread_); 880 RTC_DCHECK(receive_stream != nullptr); 881 webrtc::AudioReceiveStreamImpl* audio_receive_stream = 882 static_cast<webrtc::AudioReceiveStreamImpl*>(receive_stream); 883 884 // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync 885 // and UpdateAggregateNetworkState on the network thread. The call to 886 // `UnregisterFromTransport` should also happen on the network thread. 887 audio_receive_stream->UnregisterFromTransport(); 888 889 uint32_t ssrc = audio_receive_stream->remote_ssrc(); 890 receive_side_cc_.RemoveStream(ssrc); 891 892 audio_receive_streams_.erase(audio_receive_stream); 893 894 // After calling erase(), call ConfigureSync. This will clear associated 895 // video streams or associate them with a different audio stream if one exists 896 // for this sync_group. 897 ConfigureSync(audio_receive_stream->sync_group()); 898 899 UpdateAggregateNetworkState(); 900 // TODO(bugs.webrtc.org/11993): Consider if deleting `audio_receive_stream` 901 // on the network thread would be better or if we'd need to tear down the 902 // state in two phases. 903 delete audio_receive_stream; 904 } 905 906 // This method can be used for Call tests with external fec controller factory. 907 webrtc::VideoSendStream* Call::CreateVideoSendStream( 908 webrtc::VideoSendStream::Config config, 909 VideoEncoderConfig encoder_config, 910 std::unique_ptr<FecController> fec_controller) { 911 TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream"); 912 RTC_DCHECK_RUN_ON(worker_thread_); 913 914 EnsureStarted(); 915 916 video_send_delay_stats_->AddSsrcs(config); 917 for (size_t ssrc_index = 0; ssrc_index < config.rtp.ssrcs.size(); 918 ++ssrc_index) { 919 env_.event_log().Log(std::make_unique<RtcEventVideoSendStreamConfig>( 920 CreateRtcLogStreamConfig(config, ssrc_index))); 921 } 922 923 // TODO(mflodman): Base the start bitrate on a current bandwidth estimate, if 924 // the call has already started. 925 // Copy ssrcs from `config` since `config` is moved. 926 std::vector<uint32_t> ssrcs = config.rtp.ssrcs; 927 928 VideoSendStreamImpl* send_stream = new VideoSendStreamImpl( 929 env_, num_cpu_cores_, call_stats_->AsRtcpRttStats(), 930 transport_send_.get(), config_.encode_metronome, bitrate_allocator_.get(), 931 video_send_delay_stats_.get(), std::move(config), 932 std::move(encoder_config), suspended_video_send_ssrcs_, 933 suspended_video_payload_states_, std::move(fec_controller)); 934 935 for (uint32_t ssrc : ssrcs) { 936 RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end()); 937 video_send_ssrcs_[ssrc] = send_stream; 938 } 939 video_send_streams_.insert(send_stream); 940 video_send_streams_empty_.store(false, std::memory_order_relaxed); 941 942 // Forward resources that were previously added to the call to the new stream. 943 for (const auto& resource_forwarder : adaptation_resource_forwarders_) { 944 resource_forwarder->OnCreateVideoSendStream(send_stream); 945 } 946 947 UpdateAggregateNetworkState(); 948 949 return send_stream; 950 } 951 952 webrtc::VideoSendStream* Call::CreateVideoSendStream( 953 webrtc::VideoSendStream::Config config, 954 VideoEncoderConfig encoder_config) { 955 RTC_DCHECK_RUN_ON(worker_thread_); 956 if (config_.fec_controller_factory) { 957 RTC_LOG(LS_INFO) << "External FEC Controller will be used."; 958 } 959 std::unique_ptr<FecController> fec_controller = 960 config_.fec_controller_factory 961 ? config_.fec_controller_factory->CreateFecController(env_) 962 : std::make_unique<FecControllerDefault>(env_); 963 return CreateVideoSendStream(std::move(config), std::move(encoder_config), 964 std::move(fec_controller)); 965 } 966 967 void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { 968 TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream"); 969 RTC_DCHECK(send_stream != nullptr); 970 RTC_DCHECK_RUN_ON(worker_thread_); 971 972 VideoSendStreamImpl* send_stream_impl = 973 static_cast<VideoSendStreamImpl*>(send_stream); 974 975 auto it = video_send_ssrcs_.begin(); 976 while (it != video_send_ssrcs_.end()) { 977 if (it->second == static_cast<VideoSendStreamImpl*>(send_stream)) { 978 send_stream_impl = it->second; 979 video_send_ssrcs_.erase(it++); 980 } else { 981 ++it; 982 } 983 } 984 985 // Stop forwarding resources to the stream being destroyed. 986 for (const auto& resource_forwarder : adaptation_resource_forwarders_) { 987 resource_forwarder->OnDestroyVideoSendStream(send_stream_impl); 988 } 989 video_send_streams_.erase(send_stream_impl); 990 if (video_send_streams_.empty()) 991 video_send_streams_empty_.store(true, std::memory_order_relaxed); 992 993 VideoSendStreamImpl::RtpStateMap rtp_states; 994 VideoSendStreamImpl::RtpPayloadStateMap rtp_payload_states; 995 send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states, 996 &rtp_payload_states); 997 for (const auto& kv : rtp_states) { 998 suspended_video_send_ssrcs_[kv.first] = kv.second; 999 } 1000 for (const auto& kv : rtp_payload_states) { 1001 suspended_video_payload_states_[kv.first] = kv.second; 1002 } 1003 1004 UpdateAggregateNetworkState(); 1005 // TODO(tommi): consider deleting on the same thread as runs 1006 // StopPermanentlyAndGetRtpStates. 1007 delete send_stream_impl; 1008 } 1009 1010 webrtc::VideoReceiveStreamInterface* Call::CreateVideoReceiveStream( 1011 webrtc::VideoReceiveStreamInterface::Config configuration) { 1012 TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream"); 1013 RTC_DCHECK_RUN_ON(worker_thread_); 1014 1015 EnsureStarted(); 1016 1017 env_.event_log().Log(std::make_unique<RtcEventVideoReceiveStreamConfig>( 1018 CreateRtcLogStreamConfig(configuration))); 1019 1020 // TODO(bugs.webrtc.org/11993): Move the registration between `receive_stream` 1021 // and `video_receiver_controller_` out of VideoReceiveStream2 construction 1022 // and set it up asynchronously on the network thread (the registration and 1023 // `video_receiver_controller_` need to live on the network thread). 1024 // TODO(crbug.com/1381982): Re-enable decode synchronizer once the Chromium 1025 // API has adapted to the new Metronome interface. 1026 VideoReceiveStream2* receive_stream = new VideoReceiveStream2( 1027 env_, this, num_cpu_cores_, transport_send_->packet_router(), 1028 std::move(configuration), call_stats_.get(), 1029 std::make_unique<VCMTiming>(&env_.clock(), env_.field_trials()), 1030 &nack_periodic_processor_, decode_sync_.get()); 1031 // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network 1032 // thread. 1033 receive_stream->RegisterWithTransport(&video_receiver_controller_); 1034 video_receive_streams_.insert(receive_stream); 1035 1036 ConfigureSync(receive_stream->sync_group()); 1037 1038 receive_stream->SignalNetworkState(video_network_state_); 1039 UpdateAggregateNetworkState(); 1040 return receive_stream; 1041 } 1042 1043 void Call::DestroyVideoReceiveStream( 1044 webrtc::VideoReceiveStreamInterface* receive_stream) { 1045 TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); 1046 RTC_DCHECK_RUN_ON(worker_thread_); 1047 RTC_DCHECK(receive_stream != nullptr); 1048 VideoReceiveStream2* receive_stream_impl = 1049 static_cast<VideoReceiveStream2*>(receive_stream); 1050 // TODO(bugs.webrtc.org/11993): Unregister on the network thread. 1051 receive_stream_impl->UnregisterFromTransport(); 1052 video_receive_streams_.erase(receive_stream_impl); 1053 ConfigureSync(receive_stream_impl->sync_group()); 1054 1055 receive_side_cc_.RemoveStream(receive_stream_impl->remote_ssrc()); 1056 1057 UpdateAggregateNetworkState(); 1058 delete receive_stream_impl; 1059 } 1060 1061 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( 1062 const FlexfecReceiveStream::Config config) { 1063 TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream"); 1064 RTC_DCHECK_RUN_ON(worker_thread_); 1065 1066 // Unlike the video and audio receive streams, FlexfecReceiveStream implements 1067 // RtpPacketSinkInterface itself, and hence its constructor passes its `this` 1068 // pointer to video_receiver_controller_->CreateStream(). Calling the 1069 // constructor while on the worker thread ensures that we don't call 1070 // OnRtpPacket until the constructor is finished and the object is 1071 // in a valid state, since OnRtpPacket runs on the same thread. 1072 FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl( 1073 env_, std::move(config), &video_receiver_controller_, 1074 call_stats_->AsRtcpRttStats()); 1075 1076 // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network 1077 // thread. 1078 receive_stream->RegisterWithTransport(&video_receiver_controller_); 1079 // TODO(brandtr): Store config in RtcEventLog here. 1080 1081 return receive_stream; 1082 } 1083 1084 void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { 1085 TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream"); 1086 RTC_DCHECK_RUN_ON(worker_thread_); 1087 1088 FlexfecReceiveStreamImpl* receive_stream_impl = 1089 static_cast<FlexfecReceiveStreamImpl*>(receive_stream); 1090 // TODO(bugs.webrtc.org/11993): Unregister on the network thread. 1091 receive_stream_impl->UnregisterFromTransport(); 1092 1093 auto ssrc = receive_stream_impl->remote_ssrc(); 1094 // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be 1095 // destroyed. 1096 receive_side_cc_.RemoveStream(ssrc); 1097 1098 delete receive_stream_impl; 1099 } 1100 1101 void Call::AddAdaptationResource(scoped_refptr<Resource> resource) { 1102 RTC_DCHECK_RUN_ON(worker_thread_); 1103 adaptation_resource_forwarders_.push_back( 1104 std::make_unique<ResourceVideoSendStreamForwarder>(resource)); 1105 const auto& resource_forwarder = adaptation_resource_forwarders_.back(); 1106 for (VideoSendStream* send_stream : video_send_streams_) { 1107 resource_forwarder->OnCreateVideoSendStream(send_stream); 1108 } 1109 } 1110 1111 RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { 1112 return transport_send_.get(); 1113 } 1114 1115 PayloadTypeSuggester* Call::GetPayloadTypeSuggester() { 1116 // TODO: https://issues.webrtc.org/360058654 - make mandatory at 1117 // initialization. Currently, only some channels use it. 1118 RTC_DCHECK_RUN_ON(worker_thread_); 1119 if (!pt_suggester_) { 1120 // Make something that will work most of the time for testing. 1121 owned_pt_suggester_ = std::make_unique<PayloadTypeSuggesterForTests>(); 1122 SetPayloadTypeSuggester(owned_pt_suggester_.get()); 1123 } 1124 return pt_suggester_; 1125 } 1126 1127 void Call::SetPayloadTypeSuggester(PayloadTypeSuggester* suggester) { 1128 RTC_CHECK(!pt_suggester_) 1129 << "SetPayloadTypeSuggester can be called only once"; 1130 pt_suggester_ = suggester; 1131 } 1132 1133 Call::Stats Call::GetStats() const { 1134 RTC_DCHECK_RUN_ON(worker_thread_); 1135 1136 Stats stats; 1137 // TODO(srte): It is unclear if we only want to report queues if network is 1138 // available. 1139 stats.pacer_delay_ms = 1140 aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0; 1141 1142 stats.rtt_ms = call_stats_->LastProcessedRtt(); 1143 1144 // Fetch available send/receive bitrates. 1145 stats.recv_bandwidth_bps = receive_side_cc_.LatestReceiveSideEstimate().bps(); 1146 stats.send_bandwidth_bps = 1147 last_bandwidth_bps_.load(std::memory_order_relaxed); 1148 stats.max_padding_bitrate_bps = 1149 configured_max_padding_bitrate_bps_.load(std::memory_order_relaxed); 1150 1151 // Congestion control feedback messages received. 1152 stats.ccfb_messages_received = 1153 transport_send_->ReceivedCongestionControlFeedbackCount(); 1154 1155 return stats; 1156 } 1157 1158 void Call::SetPreferredRtcpCcAckType( 1159 RtcpFeedbackType preferred_rtcp_cc_ack_type) { 1160 if (preferred_rtcp_cc_ack_type == RtcpFeedbackType::CCFB) { 1161 receive_side_cc_.EnableSendCongestionControlFeedbackAccordingToRfc8888(); 1162 transport_send_->EnableCongestionControlFeedbackAccordingToRfc8888(); 1163 } // else default to transport CC if correct header extension is negotiated 1164 } 1165 1166 std::optional<int> Call::FeedbackAccordingToRfc8888Count() { 1167 return transport_send_->ReceivedCongestionControlFeedbackCount(); 1168 } 1169 1170 std::optional<int> Call::FeedbackAccordingToTransportCcCount() { 1171 return transport_send_->ReceivedTransportCcFeedbackCount(); 1172 } 1173 1174 TaskQueueBase* Call::network_thread() const { 1175 return network_thread_; 1176 } 1177 1178 TaskQueueBase* Call::worker_thread() const { 1179 return worker_thread_; 1180 } 1181 1182 void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { 1183 RTC_DCHECK_RUN_ON(network_thread_); 1184 RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO); 1185 1186 auto closure = [this, media, state]() { 1187 // TODO(bugs.webrtc.org/11993): Move this over to the network thread. 1188 RTC_DCHECK_RUN_ON(worker_thread_); 1189 if (media == MediaType::AUDIO) { 1190 audio_network_state_ = state; 1191 } else { 1192 RTC_DCHECK_EQ(media, MediaType::VIDEO); 1193 video_network_state_ = state; 1194 } 1195 1196 // TODO(tommi): Is it necessary to always do this, including if there 1197 // was no change in state? 1198 UpdateAggregateNetworkState(); 1199 1200 // TODO(tommi): Is it right to do this if media == AUDIO? 1201 for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { 1202 video_receive_stream->SignalNetworkState(video_network_state_); 1203 } 1204 }; 1205 1206 if (network_thread_ == worker_thread_) { 1207 closure(); 1208 } else { 1209 // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to 1210 // post to the worker thread. 1211 worker_thread_->PostTask(SafeTask(task_safety_.flag(), std::move(closure))); 1212 } 1213 } 1214 1215 void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { 1216 RTC_DCHECK_RUN_ON(network_thread_); 1217 worker_thread_->PostTask( 1218 SafeTask(task_safety_.flag(), [this, transport_overhead_per_packet]() { 1219 // TODO(bugs.webrtc.org/11993): Move this over to the network thread. 1220 RTC_DCHECK_RUN_ON(worker_thread_); 1221 for (auto& kv : audio_send_ssrcs_) { 1222 kv.second->SetTransportOverhead(transport_overhead_per_packet); 1223 } 1224 })); 1225 } 1226 1227 void Call::UpdateAggregateNetworkState() { 1228 // TODO(bugs.webrtc.org/11993): Move this over to the network thread. 1229 // RTC_DCHECK_RUN_ON(network_thread_); 1230 1231 RTC_DCHECK_RUN_ON(worker_thread_); 1232 1233 bool have_audio = 1234 !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty(); 1235 bool have_video = 1236 !video_send_ssrcs_.empty() || !video_receive_streams_.empty(); 1237 1238 bool aggregate_network_up = 1239 ((have_video && video_network_state_ == kNetworkUp) || 1240 (have_audio && audio_network_state_ == kNetworkUp)); 1241 1242 if (aggregate_network_up != aggregate_network_up_) { 1243 RTC_LOG(LS_INFO) 1244 << "UpdateAggregateNetworkState: aggregate_state change to " 1245 << (aggregate_network_up ? "up" : "down"); 1246 } else { 1247 RTC_LOG(LS_VERBOSE) 1248 << "UpdateAggregateNetworkState: aggregate_state remains at " 1249 << (aggregate_network_up ? "up" : "down"); 1250 } 1251 aggregate_network_up_ = aggregate_network_up; 1252 1253 transport_send_->OnNetworkAvailability(aggregate_network_up); 1254 } 1255 1256 void Call::OnLocalSsrcUpdated(webrtc::AudioReceiveStreamInterface& stream, 1257 uint32_t local_ssrc) { 1258 RTC_DCHECK_RUN_ON(worker_thread_); 1259 static_cast<webrtc::AudioReceiveStreamImpl&>(stream).SetLocalSsrc(local_ssrc); 1260 } 1261 1262 void Call::OnLocalSsrcUpdated(VideoReceiveStreamInterface& stream, 1263 uint32_t local_ssrc) { 1264 RTC_DCHECK_RUN_ON(worker_thread_); 1265 static_cast<VideoReceiveStream2&>(stream).SetLocalSsrc(local_ssrc); 1266 } 1267 1268 void Call::OnLocalSsrcUpdated(FlexfecReceiveStream& stream, 1269 uint32_t local_ssrc) { 1270 RTC_DCHECK_RUN_ON(worker_thread_); 1271 static_cast<FlexfecReceiveStreamImpl&>(stream).SetLocalSsrc(local_ssrc); 1272 } 1273 1274 void Call::OnUpdateSyncGroup(webrtc::AudioReceiveStreamInterface& stream, 1275 absl::string_view sync_group) { 1276 RTC_DCHECK_RUN_ON(worker_thread_); 1277 webrtc::AudioReceiveStreamImpl& receive_stream = 1278 static_cast<webrtc::AudioReceiveStreamImpl&>(stream); 1279 receive_stream.SetSyncGroup(sync_group); 1280 ConfigureSync(sync_group); 1281 } 1282 1283 void Call::OnSentPacket(const SentPacketInfo& sent_packet) { 1284 RTC_DCHECK_RUN_ON(&sent_packet_sequence_checker_); 1285 // When bundling is in effect, multiple senders may be sharing the same 1286 // transport. It means every |sent_packet| will be multiply notified from 1287 // different channels, WebRtcVoiceMediaChannel or WebRtcVideoChannel. Record 1288 // |last_sent_packet_| to deduplicate redundant notifications to downstream. 1289 // (https://crbug.com/webrtc/13437): Pass all packets without a |packet_id| to 1290 // downstream. 1291 if (last_sent_packet_.has_value() && last_sent_packet_->packet_id != -1 && 1292 last_sent_packet_->packet_id == sent_packet.packet_id && 1293 last_sent_packet_->send_time_ms == sent_packet.send_time_ms) { 1294 return; 1295 } 1296 last_sent_packet_ = sent_packet; 1297 1298 // In production and with most tests, this method will be called on the 1299 // network thread. However some test classes such as DirectTransport don't 1300 // incorporate a network thread. This means that tests for RtpSenderEgress 1301 // and ModuleRtpRtcpImpl2 that use DirectTransport, will call this method 1302 // on a ProcessThread. This is alright as is since we forward the call to 1303 // implementations that either just do a PostTask or use locking. 1304 video_send_delay_stats_->OnSentPacket(sent_packet.packet_id, 1305 env_.clock().CurrentTime()); 1306 transport_send_->OnSentPacket(sent_packet); 1307 } 1308 1309 void Call::OnStartRateUpdate(DataRate start_rate) { 1310 RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); 1311 bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>()); 1312 } 1313 1314 void Call::OnTargetTransferRate(TargetTransferRate msg) { 1315 RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); 1316 1317 uint32_t target_bitrate_bps = msg.target_rate.bps(); 1318 // For controlling the rate of feedback messages. 1319 receive_side_cc_.OnBitrateChanged(target_bitrate_bps); 1320 bitrate_allocator_->OnNetworkEstimateChanged(msg); 1321 1322 last_bandwidth_bps_.store(target_bitrate_bps, std::memory_order_relaxed); 1323 1324 // Ignore updates if bitrate is zero (the aggregate network state is 1325 // down) or if we're not sending video. 1326 // Using `video_send_streams_empty_` is racy but as the caller can't 1327 // reasonably expect synchronize with changes in `video_send_streams_` (being 1328 // on `send_transport_sequence_checker`), we can avoid a PostTask this way. 1329 if (target_bitrate_bps == 0 || 1330 video_send_streams_empty_.load(std::memory_order_relaxed)) { 1331 send_stats_.PauseSendAndPacerBitrateCounters(); 1332 } else { 1333 send_stats_.AddTargetBitrateSample(target_bitrate_bps); 1334 } 1335 } 1336 1337 void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { 1338 RTC_DCHECK_RUN_ON(&send_transport_sequence_checker_); 1339 1340 transport_send_ptr_->SetAllocatedSendBitrateLimits(limits); 1341 send_stats_.SetMinAllocatableRate(limits); 1342 configured_max_padding_bitrate_bps_.store(limits.max_padding_rate.bps(), 1343 std::memory_order_relaxed); 1344 } 1345 1346 AudioReceiveStreamImpl* Call::FindAudioStreamForSyncGroup( 1347 absl::string_view sync_group) { 1348 RTC_DCHECK_RUN_ON(worker_thread_); 1349 RTC_DCHECK_RUN_ON(&receive_11993_checker_); 1350 if (!sync_group.empty()) { 1351 for (AudioReceiveStreamImpl* stream : audio_receive_streams_) { 1352 if (stream->sync_group() == sync_group) 1353 return stream; 1354 } 1355 } 1356 1357 return nullptr; 1358 } 1359 1360 void Call::ConfigureSync(absl::string_view sync_group) { 1361 // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread. 1362 RTC_DCHECK_RUN_ON(worker_thread_); 1363 // `audio_stream` may be nullptr when clearing the audio stream for a group. 1364 AudioReceiveStreamImpl* audio_stream = 1365 FindAudioStreamForSyncGroup(sync_group); 1366 1367 size_t num_synced_streams = 0; 1368 for (VideoReceiveStream2* video_stream : video_receive_streams_) { 1369 if (video_stream->sync_group() != sync_group) 1370 continue; 1371 ++num_synced_streams; 1372 // TODO(bugs.webrtc.org/4762): Support synchronizing more than one A/V pair. 1373 // Attempting to sync more than one audio/video pair within the same sync 1374 // group is not supported in the current implementation. 1375 // Only sync the first A/V pair within this sync group. 1376 if (num_synced_streams == 1) { 1377 // sync_audio_stream may be null and that's ok. 1378 video_stream->SetSync(audio_stream); 1379 } else { 1380 video_stream->SetSync(nullptr); 1381 } 1382 } 1383 } 1384 1385 void Call::DeliverRtcpPacket(CopyOnWriteBuffer packet) { 1386 RTC_DCHECK_RUN_ON(worker_thread_); 1387 RTC_DCHECK(IsRtcpPacket(packet)); 1388 TRACE_EVENT0("webrtc", "Call::DeliverRtcp"); 1389 1390 receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size())); 1391 bool rtcp_delivered = false; 1392 ArrayView<const uint8_t> packet_view(packet.cdata(), packet.size()); 1393 for (VideoReceiveStream2* stream : video_receive_streams_) { 1394 if (stream->DeliverRtcp(packet_view)) 1395 rtcp_delivered = true; 1396 } 1397 1398 for (AudioReceiveStreamImpl* stream : audio_receive_streams_) { 1399 stream->DeliverRtcp(packet_view); 1400 rtcp_delivered = true; 1401 } 1402 1403 for (VideoSendStreamImpl* stream : video_send_streams_) { 1404 stream->DeliverRtcp(packet); 1405 rtcp_delivered = true; 1406 } 1407 1408 for (auto& kv : audio_send_ssrcs_) { 1409 kv.second->DeliverRtcp(packet_view); 1410 rtcp_delivered = true; 1411 } 1412 1413 if (rtcp_delivered) { 1414 env_.event_log().Log(std::make_unique<RtcEventRtcpPacketIncoming>(packet)); 1415 } 1416 } 1417 1418 void Call::DeliverRtpPacket( 1419 MediaType media_type, 1420 RtpPacketReceived packet, 1421 OnUndemuxablePacketHandler undemuxable_packet_handler) { 1422 RTC_DCHECK_RUN_ON(worker_thread_); 1423 RTC_DCHECK(packet.arrival_time().IsFinite()); 1424 1425 if (receive_time_calculator_) { 1426 int64_t packet_time_us = packet.arrival_time().us(); 1427 // Repair packet_time_us for clock resets by comparing a new read of 1428 // the same clock (TimeUTCMicros) to a monotonic clock reading. 1429 packet_time_us = receive_time_calculator_->ReconcileReceiveTimes( 1430 packet_time_us, TimeUTCMicros(), env_.clock().TimeInMicroseconds()); 1431 packet.set_arrival_time(Timestamp::Micros(packet_time_us)); 1432 } 1433 1434 NotifyBweOfReceivedPacket(packet, media_type); 1435 1436 env_.event_log().Log(std::make_unique<RtcEventRtpPacketIncoming>(packet)); 1437 if (media_type != MediaType::AUDIO && media_type != MediaType::VIDEO) { 1438 return; 1439 } 1440 1441 const TimeDelta nw_to_deliver_delay = 1442 env_.clock().CurrentTime() - packet.arrival_time(); 1443 RTC_HISTOGRAM_COUNTS_100000("WebRTC.TimeFromNetworkToDeliverRtpPacketUs", 1444 nw_to_deliver_delay.us()); 1445 1446 RtpStreamReceiverController& receiver_controller = 1447 media_type == MediaType::AUDIO ? audio_receiver_controller_ 1448 : video_receiver_controller_; 1449 1450 if (!receiver_controller.OnRtpPacket(packet)) { 1451 // Demuxing failed. Allow the caller to create a 1452 // receive stream in order to handle unsignalled SSRCs and try again. 1453 // Note that we dont want to call NotifyBweOfReceivedPacket twice per 1454 // packet. 1455 if (!undemuxable_packet_handler(packet)) { 1456 return; 1457 } 1458 if (!receiver_controller.OnRtpPacket(packet)) { 1459 RTC_LOG(LS_INFO) << "Failed to demux packet " << packet.Ssrc(); 1460 return; 1461 } 1462 } 1463 1464 // RateCounters expect input parameter as int, save it as int, 1465 // instead of converting each time it is passed to RateCounter::Add below. 1466 int length = static_cast<int>(packet.size()); 1467 if (media_type == MediaType::AUDIO) { 1468 receive_stats_.AddReceivedAudioBytes(length, packet.arrival_time()); 1469 } 1470 if (media_type == MediaType::VIDEO) { 1471 receive_stats_.AddReceivedVideoBytes(length, packet.arrival_time()); 1472 } 1473 } 1474 1475 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, 1476 MediaType media_type) { 1477 RTC_DCHECK_RUN_ON(worker_thread_); 1478 1479 ReceivedPacket packet_msg; 1480 packet_msg.size = DataSize::Bytes(packet.payload_size()); 1481 packet_msg.receive_time = packet.arrival_time(); 1482 uint32_t time_24; 1483 if (packet.GetExtension<AbsoluteSendTime>(&time_24)) { 1484 packet_msg.send_time = AbsoluteSendTime::ToTimestamp(time_24); 1485 } 1486 transport_send_->OnReceivedPacket(packet_msg); 1487 1488 receive_side_cc_.OnReceivedPacket(packet, media_type); 1489 } 1490 1491 } // namespace internal 1492 1493 } // namespace webrtc