Pacer.h (7434B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=2 sw=2 sts=2 et cindent: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "MediaEventSource.h" 8 #include "MediaTimer.h" 9 #include "nsDeque.h" 10 11 #ifndef DOM_MEDIA_PACER_H_ 12 # define DOM_MEDIA_PACER_H_ 13 14 extern mozilla::LazyLogModule gMediaPipelineLog; 15 # define LOG(level, msg, ...) \ 16 MOZ_LOG(gMediaPipelineLog, level, (msg, ##__VA_ARGS__)) 17 18 namespace mozilla { 19 20 /** 21 * Pacer<T> takes a queue of Ts tied to timestamps, and emits PacedItemEvents 22 * for every T at its corresponding timestamp. 23 * 24 * The queue is ordered. Enqueing an item at time t will drop all items at times 25 * later than T. This is because of how video sources work (some send out frames 26 * in the future, some don't), and to allow swapping one source for another. 27 * 28 * It supports a duplication interval. If there is no new item enqueued within 29 * the duplication interval since the last enqueued item, the last enqueud item 30 * is emitted again. 31 */ 32 template <typename T> 33 class Pacer { 34 public: 35 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Pacer) 36 37 Pacer(already_AddRefed<nsISerialEventTarget> aTarget, 38 TimeDuration aDuplicationInterval) 39 : mTarget(aTarget), 40 mDuplicationInterval(aDuplicationInterval), 41 mTimer(MakeAndAddRef<MediaTimer<TimeStamp>>()) { 42 LOG(LogLevel::Info, "Pacer %p constructed. Duplication interval is %.2fms", 43 this, mDuplicationInterval.ToMilliseconds()); 44 } 45 46 /** 47 * Enqueues an item and schedules a timer to pass it on to PacedItemEvent() at 48 * t=aTime. Already queued items with t>=aTime will be dropped. 49 */ 50 void Enqueue(T aItem, TimeStamp aTime) { 51 LOG(LogLevel::Verbose, "Pacer %p: Enqueue t=%.4fs now=%.4fs", this, 52 (aTime - mStart).ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); 53 MOZ_ALWAYS_SUCCEEDS(mTarget->Dispatch(NS_NewRunnableFunction( 54 __func__, 55 [this, self = RefPtr<Pacer>(this), aItem = std::move(aItem), aTime] { 56 MOZ_DIAGNOSTIC_ASSERT(!mIsShutdown); 57 LOG(LogLevel::Verbose, "Pacer %p: InnerEnqueue t=%.4fs, now=%.4fs", 58 self.get(), (aTime - mStart).ToSeconds(), 59 (TimeStamp::Now() - mStart).ToSeconds()); 60 while (const auto* item = mQueue.Peek()) { 61 if (item->mTime < aTime) { 62 break; 63 } 64 RefPtr<QueueItem> dropping = mQueue.Pop(); 65 } 66 mQueue.Push(MakeAndAddRef<QueueItem>(std::move(aItem), aTime, false)); 67 EnsureTimerScheduled(aTime); 68 }))); 69 } 70 71 void SetDuplicationInterval(TimeDuration aInterval) { 72 LOG(LogLevel::Info, "Pacer %p: SetDuplicationInterval(%.3fs) now=%.4fs", 73 this, aInterval.ToSeconds(), (TimeStamp::Now() - mStart).ToSeconds()); 74 MOZ_ALWAYS_SUCCEEDS(mTarget->Dispatch(NS_NewRunnableFunction( 75 __func__, [this, self = RefPtr(this), aInterval] { 76 LOG(LogLevel::Debug, 77 "Pacer %p: InnerSetDuplicationInterval(%.3fs) now=%.4fs", 78 self.get(), aInterval.ToSeconds(), 79 (TimeStamp::Now() - mStart).ToSeconds()); 80 if (auto* next = mQueue.PeekFront(); next && next->mIsDuplicate) { 81 // Adjust the time of the next duplication frame. 82 next->mTime = 83 std::max(TimeStamp::Now(), 84 next->mTime - mDuplicationInterval + aInterval); 85 EnsureTimerScheduled(next->mTime); 86 } 87 mDuplicationInterval = aInterval; 88 }))); 89 } 90 91 RefPtr<GenericPromise> Shutdown() { 92 LOG(LogLevel::Info, "Pacer %p: Shutdown, now=%.4fs", this, 93 (TimeStamp::Now() - mStart).ToSeconds()); 94 return InvokeAsync(mTarget, __func__, [this, self = RefPtr<Pacer>(this)] { 95 LOG(LogLevel::Debug, "Pacer %p: InnerShutdown, now=%.4fs", self.get(), 96 (TimeStamp::Now() - mStart).ToSeconds()); 97 mIsShutdown = true; 98 mTimer->Cancel(); 99 mQueue.Erase(); 100 mCurrentTimerTarget = Nothing(); 101 return GenericPromise::CreateAndResolve(true, "Pacer::Shutdown"); 102 }); 103 } 104 105 MediaEventSourceExc<T, TimeStamp>& PacedItemEvent() { 106 return mPacedItemEvent; 107 } 108 109 protected: 110 ~Pacer() = default; 111 112 void EnsureTimerScheduled(TimeStamp aTime) { 113 if (mCurrentTimerTarget && *mCurrentTimerTarget <= aTime) { 114 return; 115 } 116 117 if (mCurrentTimerTarget) { 118 mTimer->Cancel(); 119 mCurrentTimerTarget = Nothing(); 120 } 121 122 LOG(LogLevel::Verbose, "Pacer %p: Waiting until t=%.4fs", this, 123 (aTime - mStart).ToSeconds()); 124 mTimer->WaitUntil(aTime, __func__) 125 ->Then( 126 mTarget, __func__, 127 [this, self = RefPtr<Pacer>(this), aTime] { 128 LOG(LogLevel::Verbose, "Pacer %p: OnTimerTick t=%.4fs, now=%.4fs", 129 self.get(), (aTime - mStart).ToSeconds(), 130 (TimeStamp::Now() - mStart).ToSeconds()); 131 OnTimerTick(); 132 }, 133 [] { 134 // Timer was rejected. This is fine. 135 }); 136 mCurrentTimerTarget = Some(aTime); 137 } 138 139 void OnTimerTick() { 140 MOZ_ASSERT(mTarget->IsOnCurrentThread()); 141 142 mCurrentTimerTarget = Nothing(); 143 144 while (RefPtr<QueueItem> item = mQueue.PopFront()) { 145 auto now = TimeStamp::Now(); 146 147 if (item->mTime <= now) { 148 // It's time to process this item. 149 if (const auto& next = mQueue.PeekFront(); 150 !next || next->mTime > (item->mTime + mDuplicationInterval)) { 151 // No future frame within the duplication interval exists. Schedule 152 // a copy. 153 mQueue.PushFront(MakeAndAddRef<QueueItem>( 154 item->mItem, item->mTime + mDuplicationInterval, true)); 155 } 156 LOG(LogLevel::Verbose, "Pacer %p: NotifyPacedItem t=%.4fs, now=%.4fs", 157 this, (item->mTime - mStart).ToSeconds(), 158 (TimeStamp::Now() - mStart).ToSeconds()); 159 mPacedItemEvent.Notify(std::move(item->mItem), item->mTime); 160 continue; 161 } 162 163 // This item is in the future. Put it back. 164 mQueue.PushFront(item.forget()); 165 break; 166 } 167 168 if (const auto& next = mQueue.PeekFront(); next) { 169 // The queue is not empty. Schedule the timer. 170 EnsureTimerScheduled(next->mTime); 171 } 172 } 173 174 public: 175 const nsCOMPtr<nsISerialEventTarget> mTarget; 176 177 # ifdef MOZ_LOGGING 178 const TimeStamp mStart = TimeStamp::Now(); 179 # endif 180 181 protected: 182 struct QueueItem { 183 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(QueueItem) 184 185 QueueItem(T aItem, TimeStamp aTime, bool aIsDuplicate) 186 : mItem(std::forward<T>(aItem)), 187 mTime(aTime), 188 mIsDuplicate(aIsDuplicate) { 189 MOZ_ASSERT(!aTime.IsNull()); 190 } 191 192 T mItem; 193 TimeStamp mTime; 194 bool mIsDuplicate; 195 196 private: 197 ~QueueItem() = default; 198 }; 199 200 // Accessed on mTarget. 201 nsRefPtrDeque<QueueItem> mQueue; 202 203 // Maximum interval at which a frame should be issued, even if it means 204 // duplicating the previous. 205 TimeDuration mDuplicationInterval; 206 207 // Accessed on mTarget. 208 RefPtr<MediaTimer<TimeStamp>> mTimer; 209 210 // Accessed on mTarget. 211 Maybe<TimeStamp> mCurrentTimerTarget; 212 213 // Accessed on mTarget. 214 bool mIsShutdown = false; 215 216 MediaEventProducerExc<T, TimeStamp> mPacedItemEvent; 217 }; 218 219 } // namespace mozilla 220 221 # undef LOG 222 223 #endif