rtc_event_log_impl.cc (13105B)
1 /* 2 * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "logging/rtc_event_log/rtc_event_log_impl.h" 12 13 #include <cstddef> 14 #include <cstdint> 15 #include <functional> 16 #include <iterator> 17 #include <memory> 18 #include <string> 19 #include <utility> 20 21 #include "absl/strings/string_view.h" 22 #include "api/environment/environment.h" 23 #include "api/field_trials_view.h" 24 #include "api/rtc_event_log/rtc_event.h" 25 #include "api/rtc_event_log/rtc_event_log.h" 26 #include "api/rtc_event_log_output.h" 27 #include "api/sequence_checker.h" 28 #include "api/task_queue/task_queue_base.h" 29 #include "api/task_queue/task_queue_factory.h" 30 #include "api/units/time_delta.h" 31 #include "logging/rtc_event_log/encoder/rtc_event_log_encoder.h" 32 #include "logging/rtc_event_log/encoder/rtc_event_log_encoder_legacy.h" 33 #include "logging/rtc_event_log/encoder/rtc_event_log_encoder_new_format.h" 34 #include "rtc_base/checks.h" 35 #include "rtc_base/event.h" 36 #include "rtc_base/logging.h" 37 #include "rtc_base/numerics/safe_minmax.h" 38 #include "rtc_base/synchronization/mutex.h" 39 #include "rtc_base/time_utils.h" 40 41 namespace webrtc { 42 namespace { 43 44 std::unique_ptr<RtcEventLogEncoder> CreateEncoder(const Environment& env) { 45 if (env.field_trials().IsDisabled("WebRTC-RtcEventLogNewFormat")) { 46 RTC_DLOG(LS_INFO) << "Creating legacy encoder for RTC event log."; 47 return std::make_unique<RtcEventLogEncoderLegacy>(); 48 } else { 49 RTC_DLOG(LS_INFO) << "Creating new format encoder for RTC event log."; 50 return std::make_unique<RtcEventLogEncoderNewFormat>(env.field_trials()); 51 } 52 } 53 54 } // namespace 55 56 RtcEventLogImpl::RtcEventLogImpl(const Environment& env) 57 : RtcEventLogImpl(env, CreateEncoder(env)) {} 58 59 RtcEventLogImpl::RtcEventLogImpl(const Environment& env, 60 std::unique_ptr<RtcEventLogEncoder> encoder, 61 size_t max_events_in_history, 62 size_t max_config_events_in_history) 63 : env_(env), 64 max_events_in_history_(max_events_in_history), 65 max_config_events_in_history_(max_config_events_in_history), 66 event_encoder_(std::move(encoder)), 67 last_output_ms_(env_.clock().TimeInMilliseconds()), 68 task_queue_(env_.task_queue_factory().CreateTaskQueue( 69 "rtc_event_log", 70 TaskQueueFactory::Priority::NORMAL)) {} 71 72 RtcEventLogImpl::~RtcEventLogImpl() { 73 // If we're logging to the output, this will stop that. Blocking function. 74 mutex_.Lock(); 75 bool started = logging_state_started_; 76 mutex_.Unlock(); 77 78 if (started) { 79 logging_state_checker_.Detach(); 80 StopLogging(); 81 } 82 83 // Since we are posting tasks bound to `this`, it is critical that the event 84 // log and its members outlive `task_queue_`. Destruct `task_queue_` first 85 // to ensure tasks living on the queue can access other members. 86 // We want to block on any executing task by deleting TaskQueue before 87 // we set unique_ptr's internal pointer to null. 88 task_queue_.get_deleter()(task_queue_.get()); 89 task_queue_.release(); 90 } 91 92 bool RtcEventLogImpl::StartLogging(std::unique_ptr<RtcEventLogOutput> output, 93 int64_t output_period_ms) { 94 RTC_DCHECK(output); 95 RTC_DCHECK(output_period_ms == kImmediateOutput || output_period_ms > 0); 96 97 if (!output->IsActive()) { 98 // TODO(eladalon): We may want to remove the IsActive method. Otherwise 99 // we probably want to be consistent and terminate any existing output. 100 return false; 101 } 102 103 const int64_t timestamp_us = env_.clock().TimeInMicroseconds(); 104 const int64_t utc_time_us = TimeUTCMillis() * 1000; 105 RTC_LOG(LS_INFO) << "Starting WebRTC event log. (Timestamp, UTC) = (" 106 << timestamp_us << ", " << utc_time_us << ")."; 107 108 RTC_DCHECK_RUN_ON(&logging_state_checker_); 109 MutexLock lock(&mutex_); 110 logging_state_started_ = true; 111 immediately_output_mode_ = (output_period_ms == kImmediateOutput); 112 need_schedule_output_ = (output_period_ms != kImmediateOutput); 113 114 // Binding to `this` is safe because `this` outlives the `task_queue_`. 115 task_queue_->PostTask([this, output_period_ms, timestamp_us, utc_time_us, 116 output = std::move(output), 117 histories = ExtractRecentHistories()]() mutable { 118 RTC_DCHECK_RUN_ON(task_queue_.get()); 119 RTC_DCHECK(output); 120 RTC_DCHECK(output->IsActive()); 121 output_period_ms_ = output_period_ms; 122 event_output_ = std::move(output); 123 124 WriteToOutput(event_encoder_->EncodeLogStart(timestamp_us, utc_time_us)); 125 // Load all configs of previous sessions. 126 if (!all_config_history_.empty()) { 127 EventDeque& history = histories.config_history; 128 history.insert(history.begin(), 129 std::make_move_iterator(all_config_history_.begin()), 130 std::make_move_iterator(all_config_history_.end())); 131 all_config_history_.clear(); 132 133 if (history.size() > max_config_events_in_history_) { 134 RTC_LOG(LS_WARNING) 135 << "Dropping config events: " << history.size() 136 << " exceeds maximum " << max_config_events_in_history_; 137 history.erase(history.begin(), history.begin() + history.size() - 138 max_config_events_in_history_); 139 } 140 } 141 LogEventsToOutput(std::move(histories)); 142 }); 143 144 return true; 145 } 146 147 void RtcEventLogImpl::StopLogging() { 148 RTC_DLOG(LS_INFO) << "Stopping WebRTC event log."; 149 // TODO(bugs.webrtc.org/14449): Do not block current thread waiting on the 150 // task queue. It might work for now, for current callers, but disallows 151 // caller to share threads with the `task_queue_`. 152 Event output_stopped; 153 StopLogging([&output_stopped]() { output_stopped.Set(); }); 154 output_stopped.Wait(Event::kForever); 155 156 RTC_DLOG(LS_INFO) << "WebRTC event log successfully stopped."; 157 } 158 159 void RtcEventLogImpl::StopLogging(std::function<void()> callback) { 160 RTC_DCHECK_RUN_ON(&logging_state_checker_); 161 MutexLock lock(&mutex_); 162 logging_state_started_ = false; 163 task_queue_->PostTask( 164 [this, callback, histories = ExtractRecentHistories()]() mutable { 165 RTC_DCHECK_RUN_ON(task_queue_.get()); 166 if (event_output_) { 167 RTC_DCHECK(event_output_->IsActive()); 168 LogEventsToOutput(std::move(histories)); 169 } 170 StopLoggingInternal(); 171 callback(); 172 }); 173 } 174 175 RtcEventLogImpl::EventHistories RtcEventLogImpl::ExtractRecentHistories() { 176 EventHistories histories; 177 std::swap(histories, recent_); 178 return histories; 179 } 180 181 void RtcEventLogImpl::Log(std::unique_ptr<RtcEvent> event) { 182 RTC_CHECK(event); 183 MutexLock lock(&mutex_); 184 185 LogToMemory(std::move(event)); 186 if (logging_state_started_) { 187 if (ShouldOutputImmediately()) { 188 // Binding to `this` is safe because `this` outlives the `task_queue_`. 189 task_queue_->PostTask( 190 [this, histories = ExtractRecentHistories()]() mutable { 191 RTC_DCHECK_RUN_ON(task_queue_.get()); 192 if (event_output_) { 193 RTC_DCHECK(event_output_->IsActive()); 194 LogEventsToOutput(std::move(histories)); 195 } 196 }); 197 } else if (need_schedule_output_) { 198 need_schedule_output_ = false; 199 // Binding to `this` is safe because `this` outlives the `task_queue_`. 200 task_queue_->PostTask([this]() mutable { 201 RTC_DCHECK_RUN_ON(task_queue_.get()); 202 if (event_output_) { 203 RTC_DCHECK(event_output_->IsActive()); 204 ScheduleOutput(); 205 } 206 }); 207 } 208 } 209 } 210 211 bool RtcEventLogImpl::ShouldOutputImmediately() { 212 if (recent_.history.size() >= max_events_in_history_) { 213 // We have to emergency drain the buffer. We can't wait for the scheduled 214 // output task because there might be other event incoming before that. 215 return true; 216 } 217 218 return immediately_output_mode_; 219 } 220 221 void RtcEventLogImpl::ScheduleOutput() { 222 RTC_DCHECK(output_period_ms_ != kImmediateOutput); 223 // Binding to `this` is safe because `this` outlives the `task_queue_`. 224 auto output_task = [this]() { 225 RTC_DCHECK_RUN_ON(task_queue_.get()); 226 // Allow scheduled output if the `event_output_` is valid. 227 if (event_output_) { 228 RTC_DCHECK(event_output_->IsActive()); 229 mutex_.Lock(); 230 RTC_DCHECK(!need_schedule_output_); 231 // Let the next `Log()` to schedule output. 232 need_schedule_output_ = true; 233 EventHistories histories = ExtractRecentHistories(); 234 mutex_.Unlock(); 235 LogEventsToOutput(std::move(histories)); 236 } 237 }; 238 const int64_t now_ms = env_.clock().TimeInMilliseconds(); 239 const int64_t time_since_output_ms = now_ms - last_output_ms_; 240 const int32_t delay = 241 SafeClamp(output_period_ms_ - time_since_output_ms, 0, output_period_ms_); 242 task_queue_->PostDelayedTask(std::move(output_task), 243 TimeDelta::Millis(delay)); 244 } 245 246 void RtcEventLogImpl::LogToMemory(std::unique_ptr<RtcEvent> event) { 247 EventDeque& container = 248 event->IsConfigEvent() ? recent_.config_history : recent_.history; 249 const size_t container_max_size = event->IsConfigEvent() 250 ? max_config_events_in_history_ 251 : max_events_in_history_; 252 253 // Shouldn't lose events if started. 254 if (container.size() >= container_max_size && !logging_state_started_) { 255 container.pop_front(); 256 } 257 container.push_back(std::move(event)); 258 } 259 260 void RtcEventLogImpl::LogEventsToOutput(EventHistories histories) { 261 last_output_ms_ = env_.clock().TimeInMilliseconds(); 262 263 // Serialize the stream configurations. 264 std::string encoded_configs = event_encoder_->EncodeBatch( 265 histories.config_history.begin(), histories.config_history.end()); 266 267 // Serialize the events in the event queue. Note that the write may fail, 268 // for example if we are writing to a file and have reached the maximum limit. 269 // We don't get any feedback if this happens, so we still remove the events 270 // from the event log history. This is normally not a problem, but if another 271 // log is started immediately after the first one becomes full, then one 272 // cannot rely on the second log to contain everything that isn't in the first 273 // log; one batch of events might be missing. 274 std::string encoded_history = event_encoder_->EncodeBatch( 275 histories.history.begin(), histories.history.end()); 276 277 WriteConfigsAndHistoryToOutput(encoded_configs, encoded_history); 278 279 // Unlike other events, the configs are retained. If we stop/start logging 280 // again, these configs are used to interpret other events. 281 all_config_history_.insert( 282 all_config_history_.end(), 283 std::make_move_iterator(histories.config_history.begin()), 284 std::make_move_iterator(histories.config_history.end())); 285 if (all_config_history_.size() > max_config_events_in_history_) { 286 RTC_LOG(LS_WARNING) << "Dropping config events: " 287 << all_config_history_.size() << " exceeds maximum " 288 << max_config_events_in_history_; 289 all_config_history_.erase(all_config_history_.begin(), 290 all_config_history_.begin() + 291 all_config_history_.size() - 292 max_config_events_in_history_); 293 } 294 } 295 296 void RtcEventLogImpl::WriteConfigsAndHistoryToOutput( 297 absl::string_view encoded_configs, 298 absl::string_view encoded_history) { 299 // This function is used to merge the strings instead of calling the output 300 // object twice with small strings. The function also avoids copying any 301 // strings in the typical case where there are no config events. 302 if (encoded_configs.empty()) { 303 WriteToOutput(encoded_history); // Typical case. 304 } else if (encoded_history.empty()) { 305 WriteToOutput(encoded_configs); // Very unusual case. 306 } else { 307 std::string s; 308 s.reserve(encoded_configs.size() + encoded_history.size()); 309 s.append(encoded_configs.data(), encoded_configs.size()); 310 s.append(encoded_history.data(), encoded_history.size()); 311 WriteToOutput(s); 312 } 313 } 314 315 void RtcEventLogImpl::StopOutput() { 316 event_output_.reset(); 317 } 318 319 void RtcEventLogImpl::StopLoggingInternal() { 320 if (event_output_) { 321 RTC_DCHECK(event_output_->IsActive()); 322 const int64_t timestamp_us = env_.clock().TimeInMicroseconds(); 323 event_output_->Write(event_encoder_->EncodeLogEnd(timestamp_us)); 324 } 325 StopOutput(); 326 } 327 328 void RtcEventLogImpl::WriteToOutput(absl::string_view output_string) { 329 if (event_output_) { 330 RTC_DCHECK(event_output_->IsActive()); 331 if (!event_output_->Write(output_string)) { 332 RTC_LOG(LS_ERROR) << "Failed to write RTC event to output."; 333 // The first failure closes the output. 334 RTC_DCHECK(!event_output_->IsActive()); 335 StopOutput(); // Clean-up. 336 } 337 } 338 } 339 340 } // namespace webrtc