tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

MediaPipeline.cpp (60911B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* This Source Code Form is subject to the terms of the Mozilla Public
      3 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
      4 * You can obtain one at http://mozilla.org/MPL/2.0/. */
      5 
      6 // Original author: ekr@rtfm.com
      7 
      8 #include "MediaPipeline.h"
      9 
     10 #include <inttypes.h>
     11 #include <math.h>
     12 
     13 #include <sstream>
     14 #include <utility>
     15 
     16 #include "AudioConverter.h"
     17 #include "AudioSegment.h"
     18 #include "DOMMediaStream.h"
     19 #include "ImageContainer.h"
     20 #include "ImageTypes.h"
     21 #include "MediaEngine.h"
     22 #include "MediaSegment.h"
     23 #include "MediaStreamTrack.h"
     24 #include "MediaTrackGraph.h"
     25 #include "MediaTrackListener.h"
     26 #include "RtpLogger.h"
     27 #include "Tracing.h"
     28 #include "VideoFrameConverter.h"
     29 #include "VideoSegment.h"
     30 #include "VideoStreamTrack.h"
     31 #include "VideoUtils.h"
     32 #include "common_video/include/video_frame_buffer.h"
     33 #include "jsapi/MediaTransportHandler.h"
     34 #include "jsapi/PeerConnectionImpl.h"
     35 #include "libwebrtcglue/MediaConduitInterface.h"
     36 #include "libwebrtcglue/WebrtcImageBuffer.h"
     37 #include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
     38 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
     39 #include "mozilla/Logging.h"
     40 #include "mozilla/NullPrincipal.h"
     41 #include "mozilla/PeerIdentity.h"
     42 #include "mozilla/Preferences.h"
     43 #include "mozilla/SharedThreadPool.h"
     44 #include "mozilla/Sprintf.h"
     45 #include "mozilla/StaticPrefs_media.h"
     46 #include "mozilla/TaskQueue.h"
     47 #include "mozilla/UniquePtr.h"
     48 #include "mozilla/dom/Document.h"
     49 #include "mozilla/dom/RTCStatsReportBinding.h"
     50 #include "mozilla/gfx/Point.h"
     51 #include "mozilla/gfx/Types.h"
     52 #include "nsError.h"
     53 #include "nsThreadUtils.h"
     54 #include "transport/runnable_utils.h"
     55 
     56 // Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
     57 // 48KHz)
     58 #define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
     59 static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
     60                  AUDIO_SAMPLE_BUFFER_MAX_BYTES,
     61              "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
     62 
     63 using namespace mozilla;
     64 using namespace mozilla::dom;
     65 using namespace mozilla::gfx;
     66 using namespace mozilla::layers;
     67 
     68 mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");
     69 
     70 namespace mozilla {
     71 
     72 // An async inserter for audio data, to avoid running audio codec encoders
     73 // on the MTG/input audio thread.  Basically just bounces all the audio
     74 // data to a single audio processing/input queue.  We could if we wanted to
     75 // use multiple threads and a TaskQueue.
     76 class AudioProxyThread {
     77 public:
     78  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
     79 
     80  explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
     81      : mConduit(std::move(aConduit)),
     82        mTaskQueue(TaskQueue::Create(
     83            GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "AudioProxy")),
     84        mAudioConverter(nullptr) {
     85    MOZ_ASSERT(mConduit);
     86    MOZ_COUNT_CTOR(AudioProxyThread);
     87  }
     88 
     89  // This function is the identity if aInputRate is supported.
     90  // Else, it returns a rate that is supported, that ensure no loss in audio
     91  // quality: the sampling rate returned is always greater to the inputed
     92  // sampling-rate, if they differ..
     93  uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
     94    AudioSessionConduit* conduit =
     95        static_cast<AudioSessionConduit*>(mConduit.get());
     96    if (conduit->IsSamplingFreqSupported(aInputRate)) {
     97      return aInputRate;
     98    }
     99    if (aInputRate < 16000) {
    100      return 16000;
    101    }
    102    if (aInputRate < 32000) {
    103      return 32000;
    104    }
    105    if (aInputRate < 44100) {
    106      return 44100;
    107    }
    108    return 48000;
    109  }
    110 
    111  // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
    112  // something the conduit can work with (or send silence if the track is not
    113  // enabled), and send the audio in 10ms chunks to the conduit.
    114  void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
    115                                 bool aEnabled) {
    116    MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
    117 
    118    // Convert to interleaved 16-bits integer audio, with a maximum of two
    119    // channels (since the WebRTC.org code below makes the assumption that the
    120    // input audio is either mono or stereo), with a sample-rate rate that is
    121    // 16, 32, 44.1, or 48kHz.
    122    uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
    123    int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
    124 
    125    // We take advantage of the fact that the common case (microphone directly
    126    // to PeerConnection, that is, a normal call), the samples are already
    127    // 16-bits mono, so the representation in interleaved and planar is the
    128    // same, and we can just use that.
    129    if (aEnabled && outputChannels == 1 &&
    130        aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
    131      const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
    132      PacketizeAndSend(samples, transmissionRate, outputChannels,
    133                       aChunk.mDuration);
    134      return;
    135    }
    136 
    137    uint32_t sampleCount = aChunk.mDuration * outputChannels;
    138    if (mInterleavedAudio.Length() < sampleCount) {
    139      mInterleavedAudio.SetLength(sampleCount);
    140    }
    141 
    142    if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
    143      PodZero(mInterleavedAudio.Elements(), sampleCount);
    144    } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
    145      DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
    146                           aChunk.mVolume, outputChannels,
    147                           mInterleavedAudio.Elements());
    148    } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
    149      DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
    150                           aChunk.mVolume, outputChannels,
    151                           mInterleavedAudio.Elements());
    152    }
    153    int16_t* inputAudio = mInterleavedAudio.Elements();
    154    size_t inputAudioFrameCount = aChunk.mDuration;
    155 
    156    AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
    157                            AudioConfig::FORMAT_S16);
    158    AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
    159                             transmissionRate, AudioConfig::FORMAT_S16);
    160    // Resample to an acceptable sample-rate for the sending side
    161    if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
    162        mAudioConverter->OutputConfig() != outputConfig) {
    163      mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
    164    }
    165 
    166    int16_t* processedAudio = nullptr;
    167    size_t framesProcessed =
    168        mAudioConverter->Process(inputAudio, inputAudioFrameCount);
    169 
    170    if (framesProcessed == 0) {
    171      // In place conversion not possible, use a buffer.
    172      framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
    173                                                 inputAudioFrameCount);
    174      processedAudio = mOutputAudio.Data();
    175    } else {
    176      processedAudio = inputAudio;
    177    }
    178 
    179    PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
    180                     framesProcessed);
    181  }
    182 
    183  // This packetizes aAudioData in 10ms chunks and sends it.
    184  // aAudioData is interleaved audio data at a rate and with a channel count
    185  // that is appropriate to send with the conduit.
    186  void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
    187                        uint32_t aChannels, uint32_t aFrameCount) {
    188    MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
    189    MOZ_ASSERT(aChannels == 1 || aChannels == 2);
    190    MOZ_ASSERT(aAudioData);
    191 
    192    uint32_t audio_10ms = aRate / 100;
    193 
    194    if (!mPacketizer || mPacketizer->mPacketSize != audio_10ms ||
    195        mPacketizer->mChannels != aChannels) {
    196      // It's the right thing to drop the bit of audio still in the packetizer:
    197      // we don't want to send to the conduit audio that has two different
    198      // rates while telling it that it has a constante rate.
    199      mPacketizer =
    200          MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
    201      mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
    202    }
    203 
    204    mPacketizer->Input(aAudioData, aFrameCount);
    205 
    206    while (mPacketizer->PacketsAvailable()) {
    207      mPacketizer->Output(mPacket.get());
    208      auto frame = std::make_unique<webrtc::AudioFrame>();
    209      // UpdateFrame makes a copy of the audio data.
    210      frame->UpdateFrame(frame->timestamp_, mPacket.get(),
    211                         mPacketizer->mPacketSize, aRate, frame->speech_type_,
    212                         frame->vad_activity_, mPacketizer->mChannels);
    213      mConduit->SendAudioFrame(std::move(frame));
    214    }
    215  }
    216 
    217  void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
    218                       bool aEnabled) {
    219    RefPtr<AudioProxyThread> self = this;
    220    nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
    221        "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
    222          self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
    223        }));
    224    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
    225    (void)rv;
    226  }
    227 
    228 protected:
    229  virtual ~AudioProxyThread() { MOZ_COUNT_DTOR(AudioProxyThread); }
    230 
    231  const RefPtr<AudioSessionConduit> mConduit;
    232  const RefPtr<TaskQueue> mTaskQueue;
    233  // Only accessed on mTaskQueue
    234  UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
    235  // A buffer to hold a single packet of audio.
    236  UniquePtr<int16_t[]> mPacket;
    237  nsTArray<int16_t> mInterleavedAudio;
    238  AlignedShortBuffer mOutputAudio;
    239  UniquePtr<AudioConverter> mAudioConverter;
    240 };
    241 
    242 #define INIT_MIRROR(name, val) \
    243  name(AbstractThread::MainThread(), val, "MediaPipeline::" #name " (Mirror)")
    244 
    245 MediaPipeline::MediaPipeline(const std::string& aPc,
    246                             RefPtr<MediaTransportHandler> aTransportHandler,
    247                             DirectionType aDirection,
    248                             RefPtr<AbstractThread> aCallThread,
    249                             RefPtr<nsISerialEventTarget> aStsThread,
    250                             RefPtr<MediaSessionConduit> aConduit)
    251    : mConduit(std::move(aConduit)),
    252      mDirection(aDirection),
    253      mCallThread(std::move(aCallThread)),
    254      mStsThread(std::move(aStsThread)),
    255      INIT_MIRROR(mActive, false),
    256      mActiveSts(false),
    257      mLevel(0),
    258      mTransportHandler(std::move(aTransportHandler)),
    259      mRtpPacketsSent(0),
    260      mRtcpPacketsSent(0),
    261      mRtpPacketsReceived(0),
    262      mRtpBytesSent(0),
    263      mRtpBytesReceived(0),
    264      mPc(aPc),
    265      mRtpHeaderExtensionMap(new webrtc::RtpHeaderExtensionMap()),
    266      mPacketDumper(PacketDumper::GetPacketDumper(mPc)) {
    267  if (mDirection == DirectionType::TRANSMIT) {
    268    mRtpSendEventListener = mConduit->SenderRtpSendEvent().Connect(
    269        mStsThread, this, &MediaPipeline::SendPacket);
    270    mSenderRtcpSendEventListener = mConduit->SenderRtcpSendEvent().Connect(
    271        mStsThread, this, &MediaPipeline::SendPacket);
    272    mConduit->ConnectSenderRtcpEvent(mRtcpReceiveEvent);
    273  } else {
    274    mConduit->ConnectReceiverRtpEvent(mRtpReceiveEvent);
    275    mConduit->ConnectReceiverRtcpEvent(mRtcpReceiveEvent);
    276    mReceiverRtcpSendEventListener = mConduit->ReceiverRtcpSendEvent().Connect(
    277        mStsThread, this, &MediaPipeline::SendPacket);
    278  }
    279 }
    280 
    281 #undef INIT_MIRROR
    282 
    283 MediaPipeline::~MediaPipeline() {
    284  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
    285          ("Destroying MediaPipeline: %s", mDescription.c_str()));
    286 }
    287 
    288 void MediaPipeline::Shutdown() {
    289  MOZ_ASSERT(NS_IsMainThread());
    290 
    291  mActive.DisconnectIfConnected();
    292  RUN_ON_THREAD(mStsThread,
    293                WrapRunnable(RefPtr<MediaPipeline>(this),
    294                             &MediaPipeline::DetachTransport_s),
    295                NS_DISPATCH_NORMAL);
    296 }
    297 
    298 void MediaPipeline::DetachTransport_s() {
    299  ASSERT_ON_THREAD(mStsThread);
    300 
    301  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
    302          ("%s in %s", mDescription.c_str(), __FUNCTION__));
    303 
    304  disconnect_all();
    305  mRtpState = TransportLayer::TS_NONE;
    306  mRtcpState = TransportLayer::TS_NONE;
    307  mTransportId.clear();
    308  mConduit->SetTransportActive(false);
    309  mRtpSendEventListener.DisconnectIfExists();
    310  mSenderRtcpSendEventListener.DisconnectIfExists();
    311  mReceiverRtcpSendEventListener.DisconnectIfExists();
    312  mRtpPacketReceivedListener.DisconnectIfExists();
    313  mStateChangeListener.DisconnectIfExists();
    314  mRtcpStateChangeListener.DisconnectIfExists();
    315  mEncryptedSendingListener.DisconnectIfExists();
    316  mAlpnNegotiatedListener.DisconnectIfExists();
    317 }
    318 
    319 void MediaPipeline::UpdateTransport_m(const std::string& aTransportId,
    320                                      UniquePtr<MediaPipelineFilter>&& aFilter,
    321                                      bool aSignalingStable) {
    322  mStsThread->Dispatch(NS_NewRunnableFunction(
    323      __func__,
    324      [aTransportId, filter = std::move(aFilter),
    325       self = RefPtr<MediaPipeline>(this), aSignalingStable]() mutable {
    326        self->UpdateTransport_s(aTransportId, std::move(filter),
    327                                aSignalingStable);
    328      }));
    329 }
    330 
    331 void MediaPipeline::UpdateTransport_s(const std::string& aTransportId,
    332                                      UniquePtr<MediaPipelineFilter>&& aFilter,
    333                                      bool aSignalingStable) {
    334  ASSERT_ON_THREAD(mStsThread);
    335  // TODO: Now that this no longer uses sigslot, we can handle these events on
    336  // threads other than STS, when it makes sense to. It might be worthwhile to
    337  // do the packet handling on the call thread only, to save a thread dispatch.
    338  if (!mSignalsConnected) {
    339    mStateChangeListener = mTransportHandler->GetStateChange().Connect(
    340        mStsThread, this, &MediaPipeline::RtpStateChange);
    341    mRtcpStateChangeListener = mTransportHandler->GetRtcpStateChange().Connect(
    342        mStsThread, this, &MediaPipeline::RtcpStateChange);
    343    // Probably want to only conditionally register this
    344    mEncryptedSendingListener =
    345        mTransportHandler->GetEncryptedSending().Connect(
    346            mStsThread, this, &MediaPipeline::EncryptedPacketSending);
    347    mRtpPacketReceivedListener =
    348        mTransportHandler->GetRtpPacketReceived().Connect(
    349            mStsThread, this, &MediaPipeline::PacketReceived);
    350    mAlpnNegotiatedListener = mTransportHandler->GetAlpnNegotiated().Connect(
    351        mStsThread, this, &MediaPipeline::AlpnNegotiated);
    352    mSignalsConnected = true;
    353  }
    354 
    355  if (aTransportId != mTransportId) {
    356    mTransportId = aTransportId;
    357    mRtpState = mTransportHandler->GetState(mTransportId, false);
    358    mRtcpState = mTransportHandler->GetState(mTransportId, true);
    359    CheckTransportStates();
    360  }
    361 
    362  if (mFilter) {
    363    for (const auto& extension : mFilter->GetExtmap()) {
    364      mRtpHeaderExtensionMap->Deregister(extension.uri);
    365    }
    366  }
    367  if (mFilter && aFilter) {
    368    // Use the new filter, but don't forget any remote SSRCs that we've learned
    369    // by receiving traffic.
    370    mFilter->Update(*aFilter, aSignalingStable);
    371  } else {
    372    mFilter = std::move(aFilter);
    373  }
    374  if (mFilter) {
    375    for (const auto& extension : mFilter->GetExtmap()) {
    376      mRtpHeaderExtensionMap->RegisterByUri(extension.id, extension.uri);
    377    }
    378  }
    379 }
    380 
    381 void MediaPipeline::GetContributingSourceStats(
    382    const nsString& aInboundRtpStreamId,
    383    FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
    384  ASSERT_ON_THREAD(mStsThread);
    385  // Get the expiry from now
    386  DOMHighResTimeStamp expiry =
    387      RtpCSRCStats::GetExpiryFromTime(GetTimestampMaker().GetNow().ToDom());
    388  for (auto info : mCsrcStats) {
    389    if (!info.second.Expired(expiry)) {
    390      RTCRTPContributingSourceStats stats;
    391      info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
    392      if (!aArr.AppendElement(stats, fallible)) {
    393        mozalloc_handle_oom(0);
    394      }
    395    }
    396  }
    397 }
    398 
    399 void MediaPipeline::RtpStateChange(const std::string& aTransportId,
    400                                   TransportLayer::State aState) {
    401  if (mTransportId != aTransportId) {
    402    return;
    403  }
    404  mRtpState = aState;
    405  CheckTransportStates();
    406 }
    407 
    408 void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
    409                                    TransportLayer::State aState) {
    410  if (mTransportId != aTransportId) {
    411    return;
    412  }
    413  mRtcpState = aState;
    414  CheckTransportStates();
    415 }
    416 
    417 void MediaPipeline::CheckTransportStates() {
    418  ASSERT_ON_THREAD(mStsThread);
    419 
    420  if (mRtpState == TransportLayer::TS_CLOSED ||
    421      mRtpState == TransportLayer::TS_ERROR ||
    422      mRtcpState == TransportLayer::TS_CLOSED ||
    423      mRtcpState == TransportLayer::TS_ERROR) {
    424    MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
    425            ("RTP Transport failed for pipeline %p flow %s", this,
    426             mDescription.c_str()));
    427 
    428    NS_WARNING(
    429        "MediaPipeline Transport failed. This is not properly cleaned up yet");
    430    // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
    431    // connection was good and now it is bad.
    432    // TODO(ekr@rtfm.com): Report up so that the PC knows we
    433    // have experienced an error.
    434    mConduit->SetTransportActive(false);
    435    mRtpSendEventListener.DisconnectIfExists();
    436    mSenderRtcpSendEventListener.DisconnectIfExists();
    437    mReceiverRtcpSendEventListener.DisconnectIfExists();
    438    return;
    439  }
    440 
    441  if (mRtpState == TransportLayer::TS_OPEN) {
    442    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
    443            ("RTP Transport ready for pipeline %p flow %s", this,
    444             mDescription.c_str()));
    445  }
    446 
    447  if (mRtcpState == TransportLayer::TS_OPEN) {
    448    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
    449            ("RTCP Transport ready for pipeline %p flow %s", this,
    450             mDescription.c_str()));
    451  }
    452 
    453  if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
    454    mConduit->SetTransportActive(true);
    455    TransportReady_s();
    456  }
    457 }
    458 
    459 void MediaPipeline::SendPacket(MediaPacket&& aPacket) {
    460  ASSERT_ON_THREAD(mStsThread);
    461 
    462  const bool isRtp = aPacket.type() == MediaPacket::RTP;
    463 
    464  if (isRtp && mRtpState != TransportLayer::TS_OPEN) {
    465    return;
    466  }
    467 
    468  if (!isRtp && mRtcpState != TransportLayer::TS_OPEN) {
    469    return;
    470  }
    471 
    472  aPacket.sdp_level() = Some(Level());
    473 
    474  if (RtpLogger::IsPacketLoggingOn()) {
    475    RtpLogger::LogPacket(aPacket, false, mDescription);
    476  }
    477 
    478  if (isRtp) {
    479    mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtp, true,
    480                        aPacket.data(), aPacket.len());
    481    IncrementRtpPacketsSent(aPacket);
    482  } else {
    483    mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtcp, true,
    484                        aPacket.data(), aPacket.len());
    485    IncrementRtcpPacketsSent();
    486  }
    487 
    488  MOZ_LOG(
    489      gMediaPipelineLog, LogLevel::Debug,
    490      ("%s sending %s packet", mDescription.c_str(), (isRtp ? "RTP" : "RTCP")));
    491 
    492  mTransportHandler->SendPacket(mTransportId, std::move(aPacket));
    493 }
    494 
    495 void MediaPipeline::IncrementRtpPacketsSent(const MediaPacket& aPacket) {
    496  ASSERT_ON_THREAD(mStsThread);
    497  ++mRtpPacketsSent;
    498  mRtpBytesSent += aPacket.len();
    499 
    500  if (!(mRtpPacketsSent % 100)) {
    501    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
    502            ("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
    503             mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
    504  }
    505 }
    506 
    507 void MediaPipeline::IncrementRtcpPacketsSent() {
    508  ASSERT_ON_THREAD(mStsThread);
    509  ++mRtcpPacketsSent;
    510  if (!(mRtcpPacketsSent % 100)) {
    511    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
    512            ("RTCP sent packet count for %s Pipeline %p: %u",
    513             mDescription.c_str(), this, mRtcpPacketsSent));
    514  }
    515 }
    516 
    517 void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
    518  ASSERT_ON_THREAD(mStsThread);
    519  ++mRtpPacketsReceived;
    520  mRtpBytesReceived += aBytes;
    521  if (!(mRtpPacketsReceived % 100)) {
    522    MOZ_LOG(
    523        gMediaPipelineLog, LogLevel::Info,
    524        ("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
    525         mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
    526  }
    527 }
    528 
    529 void MediaPipeline::PacketReceived(std::string& aTransportId,
    530                                   MediaPacket& packet) {
    531  ASSERT_ON_THREAD(mStsThread);
    532 
    533  if (!mActiveSts) {
    534    return;
    535  }
    536 
    537  if (mTransportId != aTransportId) {
    538    return;
    539  }
    540 
    541  MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);
    542 
    543  if (!packet.len() || !packet.data()) {
    544    return;
    545  }
    546 
    547  switch (packet.type()) {
    548    case MediaPacket::RTP:
    549      RtpPacketReceived(aTransportId, packet);
    550      break;
    551    case MediaPacket::RTCP:
    552      RtcpPacketReceived(aTransportId, packet);
    553      break;
    554    default:;
    555  }
    556 }
    557 
    558 void MediaPipeline::RtpPacketReceived(std::string& aTransportId,
    559                                      MediaPacket& packet) {
    560  if (mDirection == DirectionType::TRANSMIT) {
    561    return;
    562  }
    563 
    564  webrtc::RTPHeader header;
    565 
    566  // It is really, really lame that CopyOnWriteBuffer cannot take ownership of
    567  // a buffer. Conceivably, we could avoid the copy by using CopyOnWriteBuffer
    568  // inside MediaPacket, but that would let libwebrtc stuff leak into all parts
    569  // of our codebase.
    570  webrtc::CopyOnWriteBuffer packet_buffer(packet.data(), packet.len());
    571  webrtc::RtpPacketReceived parsedPacket(mRtpHeaderExtensionMap.get());
    572  if (!parsedPacket.Parse(packet_buffer)) {
    573    return;
    574  }
    575  parsedPacket.GetHeader(&header);
    576 
    577  if (mFilter && !mFilter->Filter(header)) {
    578    return;
    579  }
    580 
    581  auto now = GetTimestampMaker().GetNow();
    582  parsedPacket.set_arrival_time(now.ToRealtime());
    583  if (IsVideo()) {
    584    parsedPacket.set_payload_type_frequency(webrtc::kVideoPayloadTypeFrequency);
    585  }
    586 
    587  // Remove expired RtpCSRCStats
    588  if (!mCsrcStats.empty()) {
    589    auto expiry = RtpCSRCStats::GetExpiryFromTime(now.ToDom());
    590    for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
    591      if (p->second.Expired(expiry)) {
    592        p = mCsrcStats.erase(p);
    593        continue;
    594      }
    595      p++;
    596    }
    597  }
    598 
    599  // Add new RtpCSRCStats
    600  if (header.numCSRCs) {
    601    for (auto i = 0; i < header.numCSRCs; i++) {
    602      auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
    603      if (csrcInfo == mCsrcStats.end()) {
    604        mCsrcStats.insert(
    605            std::make_pair(header.arrOfCSRCs[i],
    606                           RtpCSRCStats(header.arrOfCSRCs[i], now.ToDom())));
    607      } else {
    608        csrcInfo->second.SetTimestamp(now.ToDom());
    609      }
    610    }
    611  }
    612 
    613  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
    614          ("%s received RTP packet.", mDescription.c_str()));
    615  IncrementRtpPacketsReceived(packet.len());
    616 
    617  RtpLogger::LogPacket(packet, true, mDescription);
    618 
    619  // Might be nice to pass ownership of the buffer in this case, but it is a
    620  // small optimization in a rare case.
    621  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
    622                      packet.encrypted_data(), packet.encrypted_len());
    623 
    624  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
    625                      packet.len());
    626 
    627  mRtpReceiveEvent.Notify(std::move(parsedPacket), header);
    628 }
    629 
    630 void MediaPipeline::RtcpPacketReceived(std::string& aTransportId,
    631                                       MediaPacket& aPacket) {
    632  // The first MediaPipeline to get this notification handles the packet, all
    633  // others will see an empty packet and ignore it. It does not matter whether
    634  // the pipeline is transmit or receive, or which m-section it is associated
    635  // with.
    636  MediaPacket packet(std::move(aPacket));
    637 
    638  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
    639          ("%s received RTCP packet.", mDescription.c_str()));
    640 
    641  RtpLogger::LogPacket(packet, true, mDescription);
    642 
    643  // Might be nice to pass ownership of the buffer in this case, but it is a
    644  // small optimization in a rare case.
    645  mPacketDumper->Dump(SIZE_MAX, dom::mozPacketDumpType::Srtcp, false,
    646                      packet.encrypted_data(), packet.encrypted_len());
    647 
    648  mPacketDumper->Dump(SIZE_MAX, dom::mozPacketDumpType::Rtcp, false,
    649                      packet.data(), packet.len());
    650 
    651  if (StaticPrefs::media_webrtc_net_force_disable_rtcp_reception()) {
    652    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
    653            ("%s RTCP packet forced to be dropped", mDescription.c_str()));
    654    return;
    655  }
    656 
    657  // CopyOnWriteBuffer cannot take ownership of an existing buffer. Sadface.
    658  // But, this is RTCP, so the packets are relatively small and infrequent.
    659  mRtcpReceiveEvent.Notify(
    660      webrtc::CopyOnWriteBuffer(packet.data(), packet.len()));
    661 }
    662 
    663 void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
    664                                   bool aPrivacyRequested) {
    665  ASSERT_ON_THREAD(mStsThread);
    666 
    667  if (aPrivacyRequested && Direction() == DirectionType::RECEIVE) {
    668    // This will force the receive pipelines to drop data until they have
    669    // received a private PrincipalHandle from RTCRtpReceiver (which takes a
    670    // detour via main thread).
    671    static_cast<MediaPipelineReceive*>(this)->OnPrivacyRequested_s();
    672  }
    673 }
    674 
    675 void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
    676                                           const MediaPacket& aPacket) {
    677  ASSERT_ON_THREAD(mStsThread);
    678 
    679  if (mTransportId == aTransportId) {
    680    dom::mozPacketDumpType type;
    681    if (aPacket.type() == MediaPacket::SRTP) {
    682      type = dom::mozPacketDumpType::Srtp;
    683    } else if (aPacket.type() == MediaPacket::SRTCP) {
    684      type = dom::mozPacketDumpType::Srtcp;
    685    } else if (aPacket.type() == MediaPacket::DTLS) {
    686      // TODO(bug 1497936): Implement packet dump for DTLS
    687      return;
    688    } else {
    689      MOZ_ASSERT(false);
    690      return;
    691    }
    692    mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
    693  }
    694 }
    695 
    696 class MediaPipelineTransmit::PipelineListener
    697    : public DirectMediaTrackListener {
    698  friend class MediaPipelineTransmit;
    699 
    700 public:
    701  explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
    702      : mConduit(std::move(aConduit)),
    703        mActive(false),
    704        mEnabled(false),
    705        mDirectConnect(false) {}
    706 
    707  ~PipelineListener() {
    708    if (mConverter) {
    709      mConverter->Shutdown();
    710    }
    711  }
    712 
    713  void SetActive(bool aActive) {
    714    mActive = aActive;
    715    if (mConverter) {
    716      mConverter->SetActive(aActive);
    717    }
    718  }
    719  void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
    720 
    721  // These are needed since nested classes don't have access to any particular
    722  // instance of the parent
    723  void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
    724    mAudioProcessing = std::move(aProxy);
    725  }
    726 
    727  void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
    728    mConverter = std::move(aConverter);
    729  }
    730 
    731  // Implement MediaTrackListener
    732  void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
    733                           const MediaSegment& aQueuedMedia) override;
    734  void NotifyEnabledStateChanged(MediaTrackGraph* aGraph,
    735                                 bool aEnabled) override;
    736 
    737  // Implement DirectMediaTrackListener
    738  void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
    739                               const MediaSegment& aMedia) override;
    740  void NotifyDirectListenerInstalled(InstallationResult aResult) override;
    741  void NotifyDirectListenerUninstalled() override;
    742 
    743 private:
    744  void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
    745 
    746  const RefPtr<MediaSessionConduit> mConduit;
    747  RefPtr<AudioProxyThread> mAudioProcessing;
    748  RefPtr<VideoFrameConverter> mConverter;
    749 
    750  // active is true if there is a transport to send on
    751  mozilla::Atomic<bool> mActive;
    752  // enabled is true if the media access control permits sending
    753  // actual content; when false you get black/silence
    754  mozilla::Atomic<bool> mEnabled;
    755 
    756  // Written and read on the MediaTrackGraph thread
    757  bool mDirectConnect;
    758 };
    759 
    760 MediaPipelineTransmit::MediaPipelineTransmit(
    761    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    762    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    763    bool aIsVideo, RefPtr<MediaSessionConduit> aConduit)
    764    : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
    765                    std::move(aCallThread), std::move(aStsThread),
    766                    std::move(aConduit)),
    767      mWatchManager(this, AbstractThread::MainThread()),
    768      mIsVideo(aIsVideo),
    769      mListener(new PipelineListener(mConduit)),
    770      mDomTrack(nullptr, "MediaPipelineTransmit::mDomTrack"),
    771      mSendTrackOverride(nullptr, "MediaPipelineTransmit::mSendTrackOverride") {
    772  if (!IsVideo()) {
    773    mAudioProcessing =
    774        MakeAndAddRef<AudioProxyThread>(*mConduit->AsAudioSessionConduit());
    775    mListener->SetAudioProxy(mAudioProcessing);
    776  }
    777 
    778  mWatchManager.Watch(mActive, &MediaPipeline::UpdateActive);
    779  mWatchManager.Watch(mActive, &MediaPipelineTransmit::UpdateSendState);
    780  mWatchManager.Watch(mDomTrack, &MediaPipelineTransmit::UpdateSendState);
    781  mWatchManager.Watch(mSendTrackOverride,
    782                      &MediaPipelineTransmit::UpdateSendState);
    783 
    784  mDescription = GenerateDescription();
    785 }
    786 
    787 void MediaPipelineTransmit::RegisterListener() {
    788  if (!IsVideo()) {
    789    return;
    790  }
    791  RefPtr videoConduit = *mConduit->AsVideoSessionConduit();
    792  mConverter = VideoFrameConverter::Create(
    793      TaskQueue::Create(GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
    794                        "VideoFrameConverter")
    795          .forget(),
    796      GetTimestampMaker(), videoConduit->LockScaling());
    797  mConverter->SetIdleFrameDuplicationInterval(TimeDuration::FromSeconds(1));
    798  videoConduit->SetTrackSource(mConverter);
    799  mListener->SetVideoFrameConverter(mConverter);
    800 }
    801 
    802 already_AddRefed<MediaPipelineTransmit> MediaPipelineTransmit::Create(
    803    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    804    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    805    bool aIsVideo, RefPtr<MediaSessionConduit> aConduit) {
    806  RefPtr<MediaPipelineTransmit> transmit = new MediaPipelineTransmit(
    807      aPc, std::move(aTransportHandler), std::move(aCallThread),
    808      std::move(aStsThread), aIsVideo, std::move(aConduit));
    809 
    810  transmit->RegisterListener();
    811 
    812  return transmit.forget();
    813 }
    814 
    815 MediaPipelineTransmit::~MediaPipelineTransmit() {
    816  mFrameListener.DisconnectIfExists();
    817 
    818  MOZ_ASSERT(!mTransmitting);
    819  MOZ_ASSERT(!mDomTrack.Ref());
    820 }
    821 
    822 void MediaPipelineTransmit::InitControl(
    823    MediaPipelineTransmitControlInterface* aControl) {
    824  aControl->CanonicalTransmitting().ConnectMirror(&mActive);
    825 }
    826 
    827 void MediaPipelineTransmit::Shutdown() {
    828  MediaPipeline::Shutdown();
    829  mWatchManager.Shutdown();
    830  if (mDomTrack.Ref()) {
    831    mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
    832    mDomTrack = nullptr;
    833  }
    834  mUnsettingSendTrack = false;
    835  UpdateSendState();
    836  MOZ_ASSERT(!mTransmitting);
    837 }
    838 
    839 void MediaPipeline::SetDescription_s(const std::string& description) {
    840  ASSERT_ON_THREAD(mStsThread);
    841  mDescription = description;
    842 }
    843 
    844 std::string MediaPipelineTransmit::GenerateDescription() const {
    845  MOZ_ASSERT(NS_IsMainThread());
    846 
    847  std::stringstream description;
    848  description << mPc << "| ";
    849  description << (mIsVideo ? "Transmit video[" : "Transmit audio[");
    850 
    851  if (mDomTrack.Ref()) {
    852    nsString nsTrackId;
    853    mDomTrack.Ref()->GetId(nsTrackId);
    854    description << NS_ConvertUTF16toUTF8(nsTrackId).get();
    855  } else if (mSendTrackOverride.Ref()) {
    856    description << "override " << mSendTrackOverride.Ref().get();
    857  } else {
    858    description << "no track";
    859  }
    860 
    861  description << "]";
    862 
    863  return description.str();
    864 }
    865 
    866 void MediaPipeline::UpdateActive() {
    867  MOZ_ASSERT(NS_IsMainThread());
    868  mStsThread->Dispatch(NS_NewRunnableFunction(
    869      __func__, [this, self = RefPtr(this), active = mActive.Ref()] {
    870        mActiveSts = active;
    871      }));
    872 }
    873 
    874 void MediaPipelineTransmit::UpdateSendState() {
    875  MOZ_ASSERT(NS_IsMainThread());
    876 
    877  // This runs because either mActive, mDomTrack or mSendTrackOverride changed,
    878  // or because mSendTrack was unset async. Based on these inputs this method
    879  // is responsible for hooking up mSendTrack to mListener in order to feed data
    880  // to the conduit.
    881  //
    882  // If we are inactive, or if the send track does not match what we want to
    883  // send (mDomTrack or mSendTrackOverride), we must stop feeding data to the
    884  // conduit. NB that removing the listener from mSendTrack is async, and we
    885  // must wait for it to resolve before adding mListener to another track.
    886  // mUnsettingSendTrack gates us until the listener has been removed from
    887  // mSendTrack.
    888  //
    889  // If we are active and the send track does match what we want to send, we
    890  // make sure mListener is added to the send track. Either now, or if we're
    891  // still waiting for another send track to be removed, during a future call to
    892  // this method.
    893 
    894  if (mUnsettingSendTrack) {
    895    // We must wait for the send track to be unset before we can set it again,
    896    // to avoid races. Once unset this function is triggered again.
    897    return;
    898  }
    899 
    900  const bool wasTransmitting = mTransmitting;
    901 
    902  const bool haveLiveSendTrack = mSendTrack && !mSendTrack->IsDestroyed();
    903  const bool haveLiveDomTrack = mDomTrack.Ref() && !mDomTrack.Ref()->Ended();
    904  const bool haveLiveOverrideTrack =
    905      mSendTrackOverride.Ref() && !mSendTrackOverride.Ref()->IsDestroyed();
    906  const bool mustRemoveSendTrack =
    907      haveLiveSendTrack && !mSendTrackOverride.Ref() &&
    908      (!haveLiveDomTrack || mDomTrack.Ref()->GetTrack() != mSendPortSource);
    909 
    910  mTransmitting = mActive && (haveLiveDomTrack || haveLiveOverrideTrack) &&
    911                  !mustRemoveSendTrack;
    912 
    913  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
    914          ("MediaPipeline %p UpdateSendState wasTransmitting=%d, active=%d, "
    915           "sendTrack=%p (%s), domTrack=%p (%s), "
    916           "sendTrackOverride=%p (%s), mustRemove=%d, mTransmitting=%d",
    917           this, wasTransmitting, mActive.Ref(), mSendTrack.get(),
    918           haveLiveSendTrack ? "live" : "ended", mDomTrack.Ref().get(),
    919           haveLiveDomTrack ? "live" : "ended", mSendTrackOverride.Ref().get(),
    920           haveLiveOverrideTrack ? "live" : "ended", mustRemoveSendTrack,
    921           mTransmitting));
    922 
    923  if (!wasTransmitting && mTransmitting) {
    924    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
    925            ("Attaching pipeline %p to track %p conduit type=%s", this,
    926             mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
    927    if (mDescriptionInvalidated) {
    928      // Only update the description when we attach to a track, as detaching is
    929      // always a longer async step than updating the description. Updating on
    930      // detach would cause the wrong track id to be attributed in logs.
    931      RUN_ON_THREAD(mStsThread,
    932                    WrapRunnable(RefPtr<MediaPipeline>(this),
    933                                 &MediaPipelineTransmit::SetDescription_s,
    934                                 GenerateDescription()),
    935                    NS_DISPATCH_NORMAL);
    936      mDescriptionInvalidated = false;
    937    }
    938    if (mSendTrackOverride.Ref()) {
    939      // Special path that allows unittests to avoid mDomTrack and the graph by
    940      // manually calling SetSendTrack.
    941      mSendTrack = mSendTrackOverride.Ref();
    942    } else {
    943      mSendTrack = mDomTrack.Ref()->Graph()->CreateForwardedInputTrack(
    944          mDomTrack.Ref()->GetTrack()->mType);
    945      mSendPortSource = mDomTrack.Ref()->GetTrack();
    946      mSendPort = mSendTrack->AllocateInputPort(mSendPortSource.get());
    947    }
    948    if (mIsVideo) {
    949      mConverter->SetTrackingId(mDomTrack.Ref()->GetSource().mTrackingId);
    950    }
    951    mSendTrack->QueueSetAutoend(false);
    952    if (mIsVideo) {
    953      mSendTrack->AddDirectListener(mListener);
    954    }
    955    mSendTrack->AddListener(mListener);
    956  }
    957 
    958  if (wasTransmitting && !mTransmitting) {
    959    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
    960            ("Detaching pipeline %p from track %p conduit type=%s", this,
    961             mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
    962    mUnsettingSendTrack = true;
    963    if (mIsVideo) {
    964      mSendTrack->RemoveDirectListener(mListener);
    965    }
    966    mSendTrack->RemoveListener(mListener)->Then(
    967        GetMainThreadSerialEventTarget(), __func__,
    968        [this, self = RefPtr<MediaPipelineTransmit>(this)] {
    969          mUnsettingSendTrack = false;
    970          mSendTrack = nullptr;
    971          if (!mWatchManager.IsShutdown()) {
    972            mWatchManager.ManualNotify(&MediaPipelineTransmit::UpdateSendState);
    973          }
    974        });
    975    if (!mSendTrackOverride.Ref()) {
    976      // If an override is set it may be re-used.
    977      mSendTrack->Destroy();
    978      mSendPort->Destroy();
    979      mSendPort = nullptr;
    980      mSendPortSource = nullptr;
    981    }
    982  }
    983 }
    984 
    985 bool MediaPipelineTransmit::Transmitting() const {
    986  MOZ_ASSERT(NS_IsMainThread());
    987 
    988  return mActive;
    989 }
    990 
    991 bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
    992 
    993 void MediaPipelineTransmit::PrincipalChanged(dom::MediaStreamTrack* aTrack) {
    994  MOZ_ASSERT(aTrack && aTrack == mDomTrack.Ref());
    995 
    996  PeerConnectionWrapper pcw(mPc);
    997  if (pcw.impl()) {
    998    Document* doc = pcw.impl()->GetParentObject()->GetExtantDoc();
    999    if (doc) {
   1000      UpdateSinkIdentity(doc->NodePrincipal(), pcw.impl()->GetPeerIdentity());
   1001    } else {
   1002      MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
   1003              ("Can't update sink principal; document gone"));
   1004    }
   1005  }
   1006 }
   1007 
   1008 void MediaPipelineTransmit::UpdateSinkIdentity(
   1009    nsIPrincipal* aPrincipal, const PeerIdentity* aSinkIdentity) {
   1010  MOZ_ASSERT(NS_IsMainThread());
   1011 
   1012  if (!mDomTrack.Ref()) {
   1013    // Nothing to do here
   1014    return;
   1015  }
   1016 
   1017  bool enableTrack = aPrincipal->Subsumes(mDomTrack.Ref()->GetPrincipal());
   1018  if (!enableTrack) {
   1019    // first try didn't work, but there's a chance that this is still available
   1020    // if our track is bound to a peerIdentity, and the peer connection (our
   1021    // sink) is bound to the same identity, then we can enable the track.
   1022    const PeerIdentity* trackIdentity = mDomTrack.Ref()->GetPeerIdentity();
   1023    if (aSinkIdentity && trackIdentity) {
   1024      enableTrack = (*aSinkIdentity == *trackIdentity);
   1025    }
   1026  }
   1027 
   1028  mListener->SetEnabled(enableTrack);
   1029 }
   1030 
   1031 void MediaPipelineTransmit::TransportReady_s() {
   1032  ASSERT_ON_THREAD(mStsThread);
   1033  // Call base ready function.
   1034  MediaPipeline::TransportReady_s();
   1035  mListener->SetActive(true);
   1036 }
   1037 
   1038 nsresult MediaPipelineTransmit::SetTrack(
   1039    const RefPtr<MediaStreamTrack>& aDomTrack) {
   1040  MOZ_ASSERT(NS_IsMainThread());
   1041  if (mDomTrack.Ref()) {
   1042    mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
   1043  }
   1044 
   1045  if (aDomTrack) {
   1046    nsString nsTrackId;
   1047    aDomTrack->GetId(nsTrackId);
   1048    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
   1049            ("Reattaching pipeline to track %p track %s conduit type: %s",
   1050             aDomTrack.get(), NS_ConvertUTF16toUTF8(nsTrackId).get(),
   1051             mIsVideo ? "video" : "audio"));
   1052  }
   1053 
   1054  mDescriptionInvalidated = true;
   1055  mDomTrack = aDomTrack;
   1056  if (mDomTrack.Ref()) {
   1057    mDomTrack.Ref()->AddPrincipalChangeObserver(this);
   1058    PrincipalChanged(mDomTrack.Ref());
   1059  }
   1060 
   1061  return NS_OK;
   1062 }
   1063 
   1064 RefPtr<dom::MediaStreamTrack> MediaPipelineTransmit::GetTrack() const {
   1065  MOZ_ASSERT(NS_IsMainThread());
   1066  return mDomTrack;
   1067 }
   1068 
   1069 void MediaPipelineTransmit::SetSendTrackOverride(
   1070    const RefPtr<ProcessedMediaTrack>& aSendTrack) {
   1071  MOZ_ASSERT(NS_IsMainThread());
   1072  MOZ_RELEASE_ASSERT(!mSendTrack);
   1073  MOZ_RELEASE_ASSERT(!mSendPort);
   1074  MOZ_RELEASE_ASSERT(!mSendTrackOverride.Ref());
   1075  mDescriptionInvalidated = true;
   1076  mSendTrackOverride = aSendTrack;
   1077 }
   1078 
   1079 // Called if we're attached with AddDirectListener()
   1080 void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
   1081    MediaTrackGraph* aGraph, TrackTime aOffset, const MediaSegment& aMedia) {
   1082  MOZ_LOG(
   1083      gMediaPipelineLog, LogLevel::Debug,
   1084      ("MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
   1085       ", duration=%" PRId64,
   1086       this, aOffset, aMedia.GetDuration()));
   1087  TRACE_COMMENT(
   1088      "MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData", "%s",
   1089      aMedia.GetType() == MediaSegment::VIDEO ? "Video" : "Audio");
   1090  NewData(aMedia, aGraph->GraphRate());
   1091 }
   1092 
   1093 void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
   1094    MediaTrackGraph* aGraph, TrackTime aOffset,
   1095    const MediaSegment& aQueuedMedia) {
   1096  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
   1097          ("MediaPipeline::NotifyQueuedChanges()"));
   1098 
   1099  if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
   1100    // We always get video from the direct listener.
   1101    return;
   1102  }
   1103 
   1104  TRACE("MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges (Audio)");
   1105 
   1106  if (mDirectConnect) {
   1107    // ignore non-direct data if we're also getting direct data
   1108    return;
   1109  }
   1110 
   1111  size_t rate;
   1112  if (aGraph) {
   1113    rate = aGraph->GraphRate();
   1114  } else {
   1115    // When running tests, graph may be null. In that case use a default.
   1116    rate = 16000;
   1117  }
   1118  NewData(aQueuedMedia, rate);
   1119 }
   1120 
   1121 void MediaPipelineTransmit::PipelineListener::NotifyEnabledStateChanged(
   1122    MediaTrackGraph* aGraph, bool aEnabled) {
   1123  if (mConduit->type() != MediaSessionConduit::VIDEO) {
   1124    return;
   1125  }
   1126  MOZ_ASSERT(mConverter);
   1127  mConverter->SetTrackEnabled(aEnabled);
   1128 }
   1129 
   1130 void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
   1131    InstallationResult aResult) {
   1132  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
   1133          ("MediaPipeline::NotifyDirectListenerInstalled() listener=%p,"
   1134           " result=%d",
   1135           this, static_cast<int32_t>(aResult)));
   1136 
   1137  mDirectConnect = InstallationResult::SUCCESS == aResult;
   1138 }
   1139 
   1140 void MediaPipelineTransmit::PipelineListener::
   1141    NotifyDirectListenerUninstalled() {
   1142  MOZ_LOG(
   1143      gMediaPipelineLog, LogLevel::Info,
   1144      ("MediaPipeline::NotifyDirectListenerUninstalled() listener=%p", this));
   1145 
   1146  if (mConduit->type() == MediaSessionConduit::VIDEO) {
   1147    // Reset the converter's track-enabled state. If re-added to a new track
   1148    // later and that track is disabled, we will be signaled explicitly.
   1149    MOZ_ASSERT(mConverter);
   1150    mConverter->SetTrackEnabled(true);
   1151  }
   1152 
   1153  mDirectConnect = false;
   1154 }
   1155 
   1156 void MediaPipelineTransmit::PipelineListener::NewData(
   1157    const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
   1158  if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
   1159                               ? MediaSessionConduit::AUDIO
   1160                               : MediaSessionConduit::VIDEO)) {
   1161    MOZ_ASSERT(false,
   1162               "The media type should always be correct since the "
   1163               "listener is locked to a specific track");
   1164    return;
   1165  }
   1166 
   1167  // TODO(ekr@rtfm.com): For now assume that we have only one
   1168  // track type and it's destined for us
   1169  // See bug 784517
   1170  if (aMedia.GetType() == MediaSegment::AUDIO) {
   1171    MOZ_RELEASE_ASSERT(aRate > 0);
   1172 
   1173    if (!mActive) {
   1174      MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
   1175              ("Discarding audio packets because transport not ready"));
   1176      return;
   1177    }
   1178 
   1179    const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
   1180    for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
   1181         iter.Next()) {
   1182      mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
   1183    }
   1184  } else {
   1185    const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);
   1186 
   1187    for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
   1188         iter.Next()) {
   1189      mConverter->QueueVideoChunk(*iter, !mEnabled);
   1190    }
   1191  }
   1192 }
   1193 
   1194 class GenericReceiveListener : public MediaTrackListener {
   1195 public:
   1196  GenericReceiveListener(RefPtr<SourceMediaTrack> aSource,
   1197                         TrackingId aTrackingId)
   1198      : mSource(std::move(aSource)),
   1199        mTrackingId(std::move(aTrackingId)),
   1200        mIsAudio(mSource->mType == MediaSegment::AUDIO),
   1201        mEnabled(false) {
   1202    MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread());
   1203    MOZ_DIAGNOSTIC_ASSERT(mSource, "Must be used with a SourceMediaTrack");
   1204  }
   1205 
   1206  virtual ~GenericReceiveListener() = default;
   1207 
   1208  void Init() { mSource->AddListener(this); }
   1209  void Shutdown() { mSource->RemoveListener(this); }
   1210 
   1211  void SetEnabled(bool aEnabled) {
   1212    if (mEnabled == aEnabled) {
   1213      return;
   1214    }
   1215    mEnabled = aEnabled;
   1216    if (mIsAudio && !mSource->IsDestroyed()) {
   1217      mSource->SetPullingEnabled(mEnabled);
   1218    }
   1219  }
   1220 
   1221 protected:
   1222  const RefPtr<SourceMediaTrack> mSource;
   1223  const TrackingId mTrackingId;
   1224  const bool mIsAudio;
   1225  // Main thread only.
   1226  bool mEnabled;
   1227 };
   1228 
   1229 MediaPipelineReceive::MediaPipelineReceive(
   1230    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
   1231    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
   1232    RefPtr<MediaSessionConduit> aConduit)
   1233    : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::RECEIVE,
   1234                    std::move(aCallThread), std::move(aStsThread),
   1235                    std::move(aConduit)),
   1236      mWatchManager(this, AbstractThread::MainThread()) {
   1237  mWatchManager.Watch(mActive, &MediaPipeline::UpdateActive);
   1238  mWatchManager.Watch(mActive, &MediaPipelineReceive::UpdateListener);
   1239 }
   1240 
   1241 MediaPipelineReceive::~MediaPipelineReceive() = default;
   1242 
   1243 void MediaPipelineReceive::InitControl(
   1244    MediaPipelineReceiveControlInterface* aControl) {
   1245  aControl->CanonicalReceiving().ConnectMirror(&mActive);
   1246 }
   1247 
   1248 void MediaPipelineReceive::Shutdown() {
   1249  MOZ_ASSERT(NS_IsMainThread());
   1250  MediaPipeline::Shutdown();
   1251  mWatchManager.Shutdown();
   1252 }
   1253 
   1254 class MediaPipelineReceiveAudio::PipelineListener
   1255    : public GenericReceiveListener {
   1256 public:
   1257  PipelineListener(RefPtr<SourceMediaTrack> aSource, TrackingId aTrackingId,
   1258                   RefPtr<MediaSessionConduit> aConduit,
   1259                   PrincipalHandle aPrincipalHandle, PrincipalPrivacy aPrivacy)
   1260      : GenericReceiveListener(std::move(aSource), std::move(aTrackingId)),
   1261        mConduit(std::move(aConduit)),
   1262        // AudioSession conduit only supports 16, 32, 44.1 and 48kHz
   1263        // This is an artificial limitation, it would however require more
   1264        // changes to support any rates. If the sampling rate is not-supported,
   1265        // we will use 48kHz instead.
   1266        mRate(static_cast<AudioSessionConduit*>(mConduit.get())
   1267                      ->IsSamplingFreqSupported(mSource->Graph()->GraphRate())
   1268                  ? mSource->Graph()->GraphRate()
   1269                  : WEBRTC_MAX_SAMPLE_RATE),
   1270        mTaskQueue(TaskQueue::Create(
   1271            GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
   1272            "AudioPipelineListener")),
   1273        mPlayedTicks(0),
   1274        mAudioFrame(std::make_unique<webrtc::AudioFrame>()),
   1275        mPrincipalHandle(std::move(aPrincipalHandle)),
   1276        mPrivacy(aPrivacy),
   1277        mForceSilence(false) {}
   1278 
   1279  void Init() {
   1280    GenericReceiveListener::Init();
   1281    mSource->SetAppendDataSourceRate(mRate);
   1282  }
   1283 
   1284  // Implement MediaTrackListener
   1285  void NotifyPull(MediaTrackGraph* aGraph, TrackTime aEndOfAppendedData,
   1286                  TrackTime aDesiredTime) override {
   1287    NotifyPullImpl(aDesiredTime);
   1288  }
   1289 
   1290  void OnPrivacyRequested_s() {
   1291    if (mPrivacy == PrincipalPrivacy::Private) {
   1292      return;
   1293    }
   1294    mForceSilence = true;
   1295  }
   1296 
   1297  void SetPrivatePrincipal(PrincipalHandle aHandle) {
   1298    MOZ_ASSERT(NS_IsMainThread());
   1299 
   1300    if (mSource->IsDestroyed()) {
   1301      return;
   1302    }
   1303 
   1304    mSource->QueueControlMessageWithNoShutdown(
   1305        [self = RefPtr{this}, this,
   1306         privatePrincipal = std::move(aHandle)]() mutable {
   1307          if (mPrivacy == PrincipalPrivacy::Private) {
   1308            return;
   1309          }
   1310          mPrincipalHandle = std::move(privatePrincipal);
   1311          mPrivacy = PrincipalPrivacy::Private;
   1312          mForceSilence = false;
   1313        });
   1314  }
   1315 
   1316 private:
   1317  ~PipelineListener() = default;
   1318 
   1319  void NotifyPullImpl(TrackTime aDesiredTime) {
   1320    TRACE_COMMENT("PiplineListener::NotifyPullImpl", "PipelineListener %p",
   1321                  this);
   1322    uint32_t samplesPer10ms = mRate / 100;
   1323 
   1324    // mSource's rate is not necessarily the same as the graph rate, since there
   1325    // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
   1326    // 48kHz are supported. The audio frames we get here is going to be
   1327    // resampled when inserted into the graph. aDesiredTime and mPlayedTicks are
   1328    // in the graph rate.
   1329 
   1330    while (mPlayedTicks < aDesiredTime) {
   1331      // This fetches 10ms of data, either mono or stereo
   1332      MediaConduitErrorCode err =
   1333          static_cast<AudioSessionConduit*>(mConduit.get())
   1334              ->GetAudioFrame(mRate, mAudioFrame.get());
   1335 
   1336      if (err != kMediaConduitNoError) {
   1337        // Insert silence on conduit/GIPS failure (extremely unlikely)
   1338        MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
   1339                ("Audio conduit failed (%d) to return data @ %" PRId64
   1340                 " (desired %" PRId64 " -> %f)",
   1341                 err, mPlayedTicks, aDesiredTime,
   1342                 mSource->TrackTimeToSeconds(aDesiredTime)));
   1343        constexpr size_t mono = 1;
   1344        mAudioFrame->UpdateFrame(
   1345            mAudioFrame->timestamp_, nullptr, samplesPer10ms, mRate,
   1346            mAudioFrame->speech_type_, mAudioFrame->vad_activity_,
   1347            std::max(mono, mAudioFrame->num_channels()));
   1348      }
   1349 
   1350      MOZ_LOG(
   1351          gMediaPipelineLog, LogLevel::Debug,
   1352          ("Audio conduit returned buffer for %zu channels, %zu frames",
   1353           mAudioFrame->num_channels(), mAudioFrame->samples_per_channel()));
   1354 
   1355      AudioSegment segment;
   1356      if (mForceSilence || mAudioFrame->muted()) {
   1357        segment.AppendNullData(mAudioFrame->samples_per_channel());
   1358      } else {
   1359        CheckedInt<size_t> bufferSize(sizeof(uint16_t));
   1360        bufferSize *= mAudioFrame->samples_per_channel();
   1361        bufferSize *= mAudioFrame->num_channels();
   1362        RefPtr<SharedBuffer> samples = SharedBuffer::Create(bufferSize);
   1363        int16_t* samplesData = static_cast<int16_t*>(samples->Data());
   1364        AutoTArray<int16_t*, 2> channels;
   1365        AutoTArray<const int16_t*, 2> outputChannels;
   1366 
   1367        channels.SetLength(mAudioFrame->num_channels());
   1368 
   1369        size_t offset = 0;
   1370        for (size_t i = 0; i < mAudioFrame->num_channels(); i++) {
   1371          channels[i] = samplesData + offset;
   1372          offset += mAudioFrame->samples_per_channel();
   1373        }
   1374 
   1375        DeinterleaveAndConvertBuffer(
   1376            mAudioFrame->data(), mAudioFrame->samples_per_channel(),
   1377            mAudioFrame->num_channels(), channels.Elements());
   1378 
   1379        outputChannels.AppendElements(channels);
   1380 
   1381        segment.AppendFrames(samples.forget(), outputChannels,
   1382                             mAudioFrame->samples_per_channel(),
   1383                             mPrincipalHandle);
   1384      }
   1385 
   1386      // Handle track not actually added yet or removed/finished
   1387      if (TrackTime appended = mSource->AppendData(&segment)) {
   1388        mPlayedTicks += appended;
   1389      } else {
   1390        MOZ_LOG(gMediaPipelineLog, LogLevel::Error, ("AppendData failed"));
   1391        // we can't un-read the data, but that's ok since we don't want to
   1392        // buffer - but don't i-loop!
   1393        break;
   1394      }
   1395    }
   1396  }
   1397 
   1398  const RefPtr<MediaSessionConduit> mConduit;
   1399  // This conduit's sampling rate. This is either 16, 32, 44.1 or 48kHz, and
   1400  // tries to be the same as the graph rate. If the graph rate is higher than
   1401  // 48kHz, mRate is capped to 48kHz. If mRate does not match the graph rate,
   1402  // audio is resampled to the graph rate.
   1403  const TrackRate mRate;
   1404  const RefPtr<TaskQueue> mTaskQueue;
   1405  // Number of frames of data that has been added to the SourceMediaTrack in
   1406  // the graph's rate. Graph thread only.
   1407  TrackTicks mPlayedTicks;
   1408  // Allocation of an audio frame used as a scratch buffer when reading data out
   1409  // of libwebrtc for forwarding into the graph. Graph thread only.
   1410  std::unique_ptr<webrtc::AudioFrame> mAudioFrame;
   1411  // Principal handle used when appending data to the SourceMediaTrack. Graph
   1412  // thread only.
   1413  PrincipalHandle mPrincipalHandle;
   1414  // Privacy of mPrincipalHandle. Graph thread only.
   1415  PrincipalPrivacy mPrivacy;
   1416  // Set to true on the sts thread if privacy is requested when ALPN was
   1417  // negotiated. Set to false again when mPrincipalHandle is private.
   1418  Atomic<bool> mForceSilence;
   1419 };
   1420 
   1421 MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
   1422    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
   1423    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
   1424    RefPtr<AudioSessionConduit> aConduit, RefPtr<SourceMediaTrack> aSource,
   1425    TrackingId aTrackingId, PrincipalHandle aPrincipalHandle,
   1426    PrincipalPrivacy aPrivacy)
   1427    : MediaPipelineReceive(aPc, std::move(aTransportHandler),
   1428                           std::move(aCallThread), std::move(aStsThread),
   1429                           std::move(aConduit)),
   1430      mListener(aSource ? new PipelineListener(
   1431                              std::move(aSource), std::move(aTrackingId),
   1432                              mConduit, std::move(aPrincipalHandle), aPrivacy)
   1433                        : nullptr) {
   1434  mDescription = mPc + "| Receive audio";
   1435  if (mListener) {
   1436    mListener->Init();
   1437  }
   1438 }
   1439 
   1440 void MediaPipelineReceiveAudio::Shutdown() {
   1441  MOZ_ASSERT(NS_IsMainThread());
   1442  MediaPipelineReceive::Shutdown();
   1443  if (mListener) {
   1444    mListener->Shutdown();
   1445  }
   1446 }
   1447 
   1448 void MediaPipelineReceiveAudio::OnPrivacyRequested_s() {
   1449  ASSERT_ON_THREAD(mStsThread);
   1450  if (mListener) {
   1451    mListener->OnPrivacyRequested_s();
   1452  }
   1453 }
   1454 
   1455 void MediaPipelineReceiveAudio::SetPrivatePrincipal(PrincipalHandle aHandle) {
   1456  MOZ_ASSERT(NS_IsMainThread());
   1457  if (mListener) {
   1458    mListener->SetPrivatePrincipal(std::move(aHandle));
   1459  }
   1460 }
   1461 
   1462 void MediaPipelineReceiveAudio::UpdateListener() {
   1463  MOZ_ASSERT(NS_IsMainThread());
   1464  if (mListener) {
   1465    mListener->SetEnabled(mActive.Ref());
   1466  }
   1467 }
   1468 
   1469 class MediaPipelineReceiveVideo::PipelineListener
   1470    : public GenericReceiveListener {
   1471 public:
   1472  PipelineListener(RefPtr<SourceMediaTrack> aSource, TrackingId aTrackingId,
   1473                   PrincipalHandle aPrincipalHandle, PrincipalPrivacy aPrivacy)
   1474      : GenericReceiveListener(std::move(aSource), std::move(aTrackingId)),
   1475        mImageContainer(MakeAndAddRef<ImageContainer>(
   1476            ImageUsageType::Webrtc, ImageContainer::ASYNCHRONOUS)),
   1477        mMutex("MediaPipelineReceiveVideo::PipelineListener::mMutex"),
   1478        mPrincipalHandle(std::move(aPrincipalHandle)),
   1479        mPrivacy(aPrivacy) {}
   1480  void OnPrivacyRequested_s() {
   1481    MutexAutoLock lock(mMutex);
   1482    if (mPrivacy == PrincipalPrivacy::Private) {
   1483      return;
   1484    }
   1485    mForceDropFrames = true;
   1486  }
   1487 
   1488  void SetPrivatePrincipal(PrincipalHandle aHandle) {
   1489    MutexAutoLock lock(mMutex);
   1490    if (mPrivacy == PrincipalPrivacy::Private) {
   1491      return;
   1492    }
   1493    mPrincipalHandle = std::move(aHandle);
   1494    mPrivacy = PrincipalPrivacy::Private;
   1495    mForceDropFrames = false;
   1496  }
   1497 
   1498  void RenderVideoFrame(const webrtc::VideoFrame& aVideoFrame) {
   1499    PrincipalHandle principal;
   1500    {
   1501      MutexAutoLock lock(mMutex);
   1502      if (mForceDropFrames) {
   1503        return;
   1504      }
   1505      principal = mPrincipalHandle;
   1506    }
   1507    RefPtr<Image> image;
   1508    const webrtc::VideoFrameBuffer& buffer = *aVideoFrame.video_frame_buffer();
   1509    if (buffer.type() == webrtc::VideoFrameBuffer::Type::kNative) {
   1510      // We assume that only native handles are used with the
   1511      // WebrtcMediaDataCodec decoder.
   1512      const ImageBuffer* imageBuffer = static_cast<const ImageBuffer*>(&buffer);
   1513      image = imageBuffer->GetNativeImage();
   1514    } else {
   1515      MOZ_ASSERT(buffer.type() == webrtc::VideoFrameBuffer::Type::kI420);
   1516      webrtc::scoped_refptr<const webrtc::I420BufferInterface> i420(
   1517          buffer.GetI420());
   1518 
   1519      MOZ_ASSERT(i420->DataY());
   1520      // Create a video frame using |buffer|.
   1521      PerformanceRecorder<CopyVideoStage> rec(
   1522          "MediaPipelineReceiveVideo::CopyToImage"_ns, mTrackingId,
   1523          i420->width(), i420->height());
   1524 
   1525      RefPtr<PlanarYCbCrImage> yuvImage =
   1526          mImageContainer->CreatePlanarYCbCrImage();
   1527 
   1528      PlanarYCbCrData yuvData;
   1529      yuvData.mYChannel = const_cast<uint8_t*>(i420->DataY());
   1530      yuvData.mYStride = i420->StrideY();
   1531      MOZ_ASSERT(i420->StrideU() == i420->StrideV());
   1532      yuvData.mCbCrStride = i420->StrideU();
   1533      yuvData.mCbChannel = const_cast<uint8_t*>(i420->DataU());
   1534      yuvData.mCrChannel = const_cast<uint8_t*>(i420->DataV());
   1535      yuvData.mPictureRect = IntRect(0, 0, i420->width(), i420->height());
   1536      yuvData.mStereoMode = StereoMode::MONO;
   1537      // This isn't the best default.
   1538      yuvData.mYUVColorSpace = gfx::YUVColorSpace::BT601;
   1539      yuvData.mChromaSubsampling =
   1540          gfx::ChromaSubsampling::HALF_WIDTH_AND_HEIGHT;
   1541 
   1542      if (NS_FAILED(yuvImage->CopyData(yuvData))) {
   1543        MOZ_ASSERT(false);
   1544        return;
   1545      }
   1546      rec.Record();
   1547 
   1548      image = std::move(yuvImage);
   1549    }
   1550 
   1551    Maybe<webrtc::Timestamp> receiveTime;
   1552    for (const auto& packet : aVideoFrame.packet_infos()) {
   1553      if (!receiveTime || *receiveTime < packet.receive_time()) {
   1554        receiveTime = Some(packet.receive_time());
   1555      }
   1556    }
   1557 
   1558    VideoSegment segment;
   1559    auto size = image->GetSize();
   1560    auto processingDuration =
   1561        aVideoFrame.processing_time()
   1562            ? media::TimeUnit::FromMicroseconds(
   1563                  aVideoFrame.processing_time()->Elapsed().us())
   1564            : media::TimeUnit::Invalid();
   1565    segment.AppendWebrtcRemoteFrame(
   1566        image.forget(), size, principal,
   1567        /* aForceBlack */ false, TimeStamp::Now(), processingDuration,
   1568        aVideoFrame.rtp_timestamp(), aVideoFrame.ntp_time_ms(),
   1569        receiveTime ? receiveTime->us() : 0);
   1570    mSource->AppendData(&segment);
   1571  }
   1572 
   1573 private:
   1574  RefPtr<layers::ImageContainer> mImageContainer;
   1575  Mutex mMutex;
   1576  PrincipalHandle mPrincipalHandle MOZ_GUARDED_BY(mMutex);
   1577  PrincipalPrivacy mPrivacy MOZ_GUARDED_BY(mMutex);
   1578  // Set to true on the sts thread if privacy is requested when ALPN was
   1579  // negotiated. Set to false again when mPrincipalHandle is private.
   1580  bool mForceDropFrames MOZ_GUARDED_BY(mMutex) = false;
   1581 };
   1582 
   1583 class MediaPipelineReceiveVideo::PipelineRenderer
   1584    : public mozilla::VideoRenderer {
   1585 public:
   1586  explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
   1587      : mPipeline(aPipeline) {}
   1588 
   1589  void Detach() { mPipeline = nullptr; }
   1590 
   1591  // Implement VideoRenderer
   1592  void RenderVideoFrame(const webrtc::VideoFrame& aVideoFrame) override {
   1593    mPipeline->mListener->RenderVideoFrame(aVideoFrame);
   1594  }
   1595 
   1596 private:
   1597  MediaPipelineReceiveVideo* mPipeline;  // Raw pointer to avoid cycles
   1598 };
   1599 
   1600 MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
   1601    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
   1602    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
   1603    RefPtr<VideoSessionConduit> aConduit, RefPtr<SourceMediaTrack> aSource,
   1604    TrackingId aTrackingId, PrincipalHandle aPrincipalHandle,
   1605    PrincipalPrivacy aPrivacy)
   1606    : MediaPipelineReceive(aPc, std::move(aTransportHandler),
   1607                           std::move(aCallThread), std::move(aStsThread),
   1608                           std::move(aConduit)),
   1609      mRenderer(new PipelineRenderer(this)),
   1610      mListener(aSource ? new PipelineListener(
   1611                              std::move(aSource), std::move(aTrackingId),
   1612                              std::move(aPrincipalHandle), aPrivacy)
   1613                        : nullptr) {
   1614  mDescription = mPc + "| Receive video";
   1615  if (mListener) {
   1616    mListener->Init();
   1617  }
   1618  static_cast<VideoSessionConduit*>(mConduit.get())->AttachRenderer(mRenderer);
   1619 }
   1620 
   1621 void MediaPipelineReceiveVideo::Shutdown() {
   1622  MOZ_ASSERT(NS_IsMainThread());
   1623  MediaPipelineReceive::Shutdown();
   1624  if (mListener) {
   1625    mListener->Shutdown();
   1626  }
   1627 
   1628  // stop generating video and thus stop invoking the PipelineRenderer
   1629  // and PipelineListener - the renderer has a raw ptr to the Pipeline to
   1630  // avoid cycles, and the render callbacks are invoked from a different
   1631  // thread so simple null-checks would cause TSAN bugs without locks.
   1632  static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
   1633 }
   1634 
   1635 void MediaPipelineReceiveVideo::OnPrivacyRequested_s() {
   1636  ASSERT_ON_THREAD(mStsThread);
   1637  if (mListener) {
   1638    mListener->OnPrivacyRequested_s();
   1639  }
   1640 }
   1641 
   1642 void MediaPipelineReceiveVideo::SetPrivatePrincipal(PrincipalHandle aHandle) {
   1643  MOZ_ASSERT(NS_IsMainThread());
   1644  if (mListener) {
   1645    mListener->SetPrivatePrincipal(std::move(aHandle));
   1646  }
   1647 }
   1648 
   1649 void MediaPipelineReceiveVideo::UpdateListener() {
   1650  MOZ_ASSERT(NS_IsMainThread());
   1651  if (mListener) {
   1652    mListener->SetEnabled(mActive.Ref());
   1653  }
   1654 }
   1655 
   1656 const dom::RTCStatsTimestampMaker& MediaPipeline::GetTimestampMaker() const {
   1657  return mConduit->GetTimestampMaker();
   1658 }
   1659 
   1660 DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
   1661    const DOMHighResTimeStamp aTime) {
   1662  // DOMHighResTimeStamp is a unit measured in ms
   1663  return aTime + EXPIRY_TIME_MILLISECONDS;
   1664 }
   1665 
   1666 MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
   1667                                          const DOMHighResTimeStamp aTime)
   1668    : mCsrc(aCsrc), mTimestamp(aTime) {}
   1669 
   1670 void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
   1671    dom::RTCRTPContributingSourceStats& aWebidlObj,
   1672    const nsString& aInboundRtpStreamId) const {
   1673  nsString statId = u"csrc_"_ns + aInboundRtpStreamId;
   1674  statId.AppendLiteral("_");
   1675  statId.AppendInt(mCsrc);
   1676  aWebidlObj.mId.Construct(statId);
   1677  aWebidlObj.mType.Construct(RTCStatsType::Csrc);
   1678  aWebidlObj.mTimestamp.Construct(mTimestamp);
   1679  aWebidlObj.mContributorSsrc.Construct(mCsrc);
   1680  aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
   1681 }
   1682 
   1683 }  // namespace mozilla