CacheIOThread.cpp (14486B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 #include "CacheIOThread.h" 6 #include "CacheFileIOManager.h" 7 #include "CacheLog.h" 8 #include "CacheObserver.h" 9 #include "GeckoProfiler.h" 10 11 #include "nsIRunnable.h" 12 #include "nsISupportsImpl.h" 13 #include "nsPrintfCString.h" 14 #include "nsThread.h" 15 #include "nsThreadManager.h" 16 #include "nsThreadUtils.h" 17 #include "mozilla/EventQueue.h" 18 #include "mozilla/IOInterposer.h" 19 #include "mozilla/ProfilerLabels.h" 20 #include "mozilla/ThreadEventQueue.h" 21 22 #ifdef XP_WIN 23 # include <windows.h> 24 #endif 25 26 namespace mozilla::net { 27 28 namespace detail { 29 30 /** 31 * Helper class encapsulating platform-specific code to cancel 32 * any pending IO operation taking too long. Solely used during 33 * shutdown to prevent any IO shutdown hangs. 34 * Mainly designed for using Win32 CancelSynchronousIo function. 35 */ 36 class NativeThreadHandle { 37 #ifdef XP_WIN 38 // The native handle to the thread 39 HANDLE mThread; 40 #endif 41 42 public: 43 // Created and destroyed on the main thread only 44 NativeThreadHandle(); 45 ~NativeThreadHandle(); 46 47 // Called on the IO thread to grab the platform specific 48 // reference to it. 49 void InitThread(); 50 // If there is a blocking operation being handled on the IO 51 // thread, this is called on the main thread during shutdown. 52 void CancelBlockingIO(Monitor& aMonitor); 53 }; 54 55 #ifdef XP_WIN 56 57 NativeThreadHandle::NativeThreadHandle() : mThread(NULL) {} 58 59 NativeThreadHandle::~NativeThreadHandle() { 60 if (mThread) { 61 CloseHandle(mThread); 62 } 63 } 64 65 void NativeThreadHandle::InitThread() { 66 // GetCurrentThread() only returns a pseudo handle, hence DuplicateHandle 67 ::DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), 68 GetCurrentProcess(), &mThread, 0, FALSE, 69 DUPLICATE_SAME_ACCESS); 70 } 71 72 void NativeThreadHandle::CancelBlockingIO(Monitor& aMonitor) { 73 HANDLE thread; 74 { 75 MonitorAutoLock lock(aMonitor); 76 thread = mThread; 77 78 if (!thread) { 79 return; 80 } 81 } 82 83 LOG(("CacheIOThread: Attempting to cancel a long blocking IO operation")); 84 BOOL result = ::CancelSynchronousIo(thread); 85 if (result) { 86 LOG((" cancelation signal succeeded")); 87 } else { 88 DWORD error = GetLastError(); 89 LOG((" cancelation signal failed with GetLastError=%lu", error)); 90 } 91 } 92 93 #else // WIN 94 95 // Stub code only (we don't implement IO cancelation for this platform) 96 97 NativeThreadHandle::NativeThreadHandle() = default; 98 NativeThreadHandle::~NativeThreadHandle() = default; 99 void NativeThreadHandle::InitThread() {} 100 void NativeThreadHandle::CancelBlockingIO(Monitor&) {} 101 102 #endif 103 104 } // namespace detail 105 106 CacheIOThread* CacheIOThread::sSelf = nullptr; 107 108 NS_IMPL_ISUPPORTS(CacheIOThread, nsIThreadObserver) 109 110 CacheIOThread::CacheIOThread() { 111 for (auto& item : mQueueLength) { 112 item = 0; 113 } 114 115 sSelf = this; 116 } 117 118 CacheIOThread::~CacheIOThread() { 119 { 120 MonitorAutoLock lock(mMonitor); 121 MOZ_RELEASE_ASSERT(mShutdown); 122 } 123 124 if (mXPCOMThread) { 125 nsIThread* thread = mXPCOMThread; 126 thread->Release(); 127 } 128 129 sSelf = nullptr; 130 #ifdef DEBUG 131 for (auto& event : mEventQueue) { 132 MOZ_ASSERT(!event.Length()); 133 } 134 #endif 135 } 136 137 nsresult CacheIOThread::Init() { 138 { 139 MonitorAutoLock lock(mMonitor); 140 // Yeah, there is not a thread yet, but we want to make sure 141 // the sequencing is correct. 142 mNativeThreadHandle = MakeUnique<detail::NativeThreadHandle>(); 143 } 144 145 // Increase the reference count while spawning a new thread. 146 // If PR_CreateThread succeeds, we will forget this reference and the thread 147 // will be responsible to release it when it completes. 148 RefPtr<CacheIOThread> self = this; 149 mThread = 150 PR_CreateThread(PR_USER_THREAD, ThreadFunc, this, PR_PRIORITY_NORMAL, 151 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, 256 * 1024); 152 if (!mThread) { 153 // Treat this thread as already shutdown. 154 MonitorAutoLock lock(mMonitor); 155 mShutdown = true; 156 return NS_ERROR_FAILURE; 157 } 158 159 // IMPORTANT: The thread now owns this reference, so it's important that we 160 // leak it here, otherwise we'll end up with a bad refcount. 161 // See the dont_AddRef in ThreadFunc(). 162 self.forget().leak(); 163 164 return NS_OK; 165 } 166 167 nsresult CacheIOThread::Dispatch(nsIRunnable* aRunnable, uint32_t aLevel) { 168 return Dispatch(do_AddRef(aRunnable), aLevel); 169 } 170 171 nsresult CacheIOThread::Dispatch(already_AddRefed<nsIRunnable> aRunnable, 172 uint32_t aLevel) { 173 NS_ENSURE_ARG(aLevel < LAST_LEVEL); 174 175 nsCOMPtr<nsIRunnable> runnable(aRunnable); 176 177 // Runnable is always expected to be non-null, hard null-check below. 178 MOZ_ASSERT(runnable); 179 180 MonitorAutoLock lock(mMonitor); 181 182 if (mShutdown && (PR_GetCurrentThread() != mThread)) { 183 return NS_ERROR_UNEXPECTED; 184 } 185 186 return DispatchInternal(runnable.forget(), aLevel); 187 } 188 189 nsresult CacheIOThread::DispatchAfterPendingOpens(nsIRunnable* aRunnable) { 190 // Runnable is always expected to be non-null, hard null-check bellow. 191 MOZ_ASSERT(aRunnable); 192 193 MonitorAutoLock lock(mMonitor); 194 195 if (mShutdown && (PR_GetCurrentThread() != mThread)) { 196 return NS_ERROR_UNEXPECTED; 197 } 198 199 // Move everything from later executed OPEN level to the OPEN_PRIORITY level 200 // where we post the (eviction) runnable. 201 mQueueLength[OPEN_PRIORITY] += mEventQueue[OPEN].Length(); 202 mQueueLength[OPEN] -= mEventQueue[OPEN].Length(); 203 mEventQueue[OPEN_PRIORITY].AppendElements(mEventQueue[OPEN]); 204 mEventQueue[OPEN].Clear(); 205 206 return DispatchInternal(do_AddRef(aRunnable), OPEN_PRIORITY); 207 } 208 209 nsresult CacheIOThread::DispatchInternal( 210 already_AddRefed<nsIRunnable> aRunnable, uint32_t aLevel) { 211 nsCOMPtr<nsIRunnable> runnable(aRunnable); 212 213 LogRunnable::LogDispatch(runnable.get()); 214 215 if (NS_WARN_IF(!runnable)) return NS_ERROR_NULL_POINTER; 216 217 mMonitor.AssertCurrentThreadOwns(); 218 219 ++mQueueLength[aLevel]; 220 mEventQueue[aLevel].AppendElement(runnable.forget()); 221 if (mLowestLevelWaiting > aLevel) mLowestLevelWaiting = aLevel; 222 223 mMonitor.NotifyAll(); 224 225 return NS_OK; 226 } 227 228 bool CacheIOThread::IsCurrentThread() { 229 return mThread == PR_GetCurrentThread(); 230 } 231 232 uint32_t CacheIOThread::QueueSize(bool highPriority) { 233 MonitorAutoLock lock(mMonitor); 234 if (highPriority) { 235 return mQueueLength[OPEN_PRIORITY] + mQueueLength[READ_PRIORITY]; 236 } 237 238 return mQueueLength[OPEN_PRIORITY] + mQueueLength[READ_PRIORITY] + 239 mQueueLength[MANAGEMENT] + mQueueLength[OPEN] + mQueueLength[READ]; 240 } 241 242 bool CacheIOThread::YieldInternal() { 243 if (!IsCurrentThread()) { 244 NS_WARNING( 245 "Trying to yield to priority events on non-cache2 I/O thread? " 246 "You probably do something wrong."); 247 return false; 248 } 249 250 if (mCurrentlyExecutingLevel == XPCOM_LEVEL) { 251 // Doesn't make any sense, since this handler is the one 252 // that would be executed as the next one. 253 return false; 254 } 255 256 if (!EventsPending(mCurrentlyExecutingLevel)) return false; 257 258 mRerunCurrentEvent = true; 259 return true; 260 } 261 262 void CacheIOThread::Shutdown() { 263 if (!mThread) { 264 return; 265 } 266 267 { 268 MonitorAutoLock lock(mMonitor); 269 mShutdown = true; 270 mMonitor.NotifyAll(); 271 } 272 273 PR_JoinThread(mThread); 274 mThread = nullptr; 275 } 276 277 void CacheIOThread::CancelBlockingIO() { 278 // This is an attempt to cancel any blocking I/O operation taking 279 // too long time. 280 if (!mNativeThreadHandle) { 281 return; 282 } 283 284 if (!mIOCancelableEvents) { 285 LOG(("CacheIOThread::CancelBlockingIO, no blocking operation to cancel")); 286 return; 287 } 288 289 // OK, when we are here, we are processing an IO on the thread that 290 // can be cancelled. 291 mNativeThreadHandle->CancelBlockingIO(mMonitor); 292 } 293 294 already_AddRefed<nsIEventTarget> CacheIOThread::Target() { 295 nsCOMPtr<nsIEventTarget> target; 296 297 target = mXPCOMThread; 298 if (!target && mThread) { 299 MonitorAutoLock lock(mMonitor); 300 while (!mXPCOMThread) { 301 lock.Wait(); 302 } 303 304 target = mXPCOMThread; 305 } 306 307 return target.forget(); 308 } 309 310 // static 311 void CacheIOThread::ThreadFunc(void* aClosure) { 312 NS_SetCurrentThreadName("Cache2 I/O"); 313 314 mozilla::IOInterposer::RegisterCurrentThread(); 315 // We hold on to this reference for the duration of the thread. 316 RefPtr<CacheIOThread> thread = 317 dont_AddRef(static_cast<CacheIOThread*>(aClosure)); 318 thread->ThreadFunc(); 319 mozilla::IOInterposer::UnregisterCurrentThread(); 320 } 321 322 void CacheIOThread::ThreadFunc() { 323 char stackTop; 324 nsCOMPtr<nsIThreadInternal> threadInternal; 325 326 { 327 MonitorAutoLock lock(mMonitor); 328 329 MOZ_ASSERT(mNativeThreadHandle); 330 mNativeThreadHandle->InitThread(); 331 332 auto queue = 333 MakeRefPtr<ThreadEventQueue>(MakeUnique<mozilla::EventQueue>()); 334 nsCOMPtr<nsIThread> xpcomThread = 335 nsThreadManager::get().CreateCurrentThread(queue); 336 #if defined(MOZ_GECKO_PROFILER) 337 profiler_register_thread("Cache2 I/O", &stackTop); 338 #endif 339 340 threadInternal = do_QueryInterface(xpcomThread); 341 if (threadInternal) threadInternal->SetObserver(this); 342 343 mXPCOMThread = xpcomThread.forget().take(); 344 nsCOMPtr<nsIThread> thread = NS_GetCurrentThread(); 345 346 lock.NotifyAll(); 347 348 do { 349 loopStart: 350 // Reset the lowest level now, so that we can detect a new event on 351 // a lower level (i.e. higher priority) has been scheduled while 352 // executing any previously scheduled event. 353 mLowestLevelWaiting = LAST_LEVEL; 354 355 // Process xpcom events first 356 while (mHasXPCOMEvents) { 357 mHasXPCOMEvents = false; 358 mCurrentlyExecutingLevel = XPCOM_LEVEL; 359 360 MonitorAutoUnlock unlock(mMonitor); 361 362 bool processedEvent; 363 nsresult rv; 364 do { 365 rv = thread->ProcessNextEvent(false, &processedEvent); 366 367 ++mEventCounter; 368 MOZ_ASSERT(mNativeThreadHandle); 369 } while (NS_SUCCEEDED(rv) && processedEvent); 370 } 371 372 uint32_t level; 373 for (level = 0; level < LAST_LEVEL; ++level) { 374 if (!mEventQueue[level].Length()) { 375 // no events on this level, go to the next level 376 continue; 377 } 378 379 LoopOneLevel(level); 380 381 // Go to the first (lowest) level again 382 goto loopStart; 383 } 384 385 if (EventsPending()) { 386 continue; 387 } 388 389 if (mShutdown) { 390 break; 391 } 392 393 AUTO_PROFILER_LABEL("CacheIOThread::ThreadFunc::Wait", IDLE); 394 lock.Wait(); 395 396 } while (true); 397 398 MOZ_ASSERT(!EventsPending()); 399 400 #ifdef DEBUG 401 // This is for correct assertion on XPCOM events dispatch. 402 mInsideLoop = false; 403 #endif 404 } // lock 405 406 if (threadInternal) threadInternal->SetObserver(nullptr); 407 #if defined(MOZ_GECKO_PROFILER) 408 profiler_unregister_thread(); 409 #endif 410 } 411 412 void CacheIOThread::LoopOneLevel(uint32_t aLevel) { 413 mMonitor.AssertCurrentThreadOwns(); 414 EventQueue events = std::move(mEventQueue[aLevel]); 415 EventQueue::size_type length = events.Length(); 416 417 mCurrentlyExecutingLevel = aLevel; 418 419 bool returnEvents = false; 420 421 EventQueue::size_type index; 422 { 423 MonitorAutoUnlock unlock(mMonitor); 424 425 for (index = 0; index < length; ++index) { 426 if (EventsPending(aLevel)) { 427 // Somebody scheduled a new event on a lower level, break and harry 428 // to execute it! Don't forget to return what we haven't exec. 429 returnEvents = true; 430 break; 431 } 432 433 // Drop any previous flagging, only an event on the current level may set 434 // this flag. 435 mRerunCurrentEvent = false; 436 437 LogRunnable::Run log(events[index].get()); 438 439 events[index]->Run(); 440 441 MOZ_ASSERT(mNativeThreadHandle); 442 443 if (mRerunCurrentEvent) { 444 // The event handler yields to higher priority events and wants to 445 // rerun. 446 log.WillRunAgain(); 447 returnEvents = true; 448 break; 449 } 450 451 ++mEventCounter; 452 --mQueueLength[aLevel]; 453 454 // Release outside the lock. 455 events[index] = nullptr; 456 } 457 } 458 459 if (returnEvents) { 460 // This code must prevent any AddRef/Release calls on the stored COMPtrs as 461 // it might be exhaustive and block the monitor's lock for an excessive 462 // amout of time. 463 464 // 'index' points at the event that was interrupted and asked for re-run, 465 // all events before have run, been nullified, and can be removed. 466 events.RemoveElementsAt(0, index); 467 // Move events that might have been scheduled on this queue to the tail to 468 // preserve the expected per-queue FIFO order. 469 // XXX(Bug 1631371) Check if this should use a fallible operation as it 470 // pretended earlier. 471 events.AppendElements(std::move(mEventQueue[aLevel])); 472 // And finally move everything back to the main queue. 473 mEventQueue[aLevel] = std::move(events); 474 } 475 } 476 477 bool CacheIOThread::EventsPending(uint32_t aLastLevel) { 478 return mLowestLevelWaiting < aLastLevel || mHasXPCOMEvents; 479 } 480 481 NS_IMETHODIMP CacheIOThread::OnDispatchedEvent() { 482 MonitorAutoLock lock(mMonitor); 483 mHasXPCOMEvents = true; 484 MOZ_ASSERT(mInsideLoop); 485 lock.Notify(); 486 return NS_OK; 487 } 488 489 NS_IMETHODIMP CacheIOThread::OnProcessNextEvent(nsIThreadInternal* thread, 490 bool mayWait) { 491 return NS_OK; 492 } 493 494 NS_IMETHODIMP CacheIOThread::AfterProcessNextEvent(nsIThreadInternal* thread, 495 bool eventWasProcessed) { 496 return NS_OK; 497 } 498 499 // Memory reporting 500 501 size_t CacheIOThread::SizeOfExcludingThis( 502 mozilla::MallocSizeOf mallocSizeOf) const { 503 MonitorAutoLock lock(const_cast<CacheIOThread*>(this)->mMonitor); 504 505 size_t n = 0; 506 for (const auto& event : mEventQueue) { 507 n += event.ShallowSizeOfExcludingThis(mallocSizeOf); 508 // Events referenced by the queues are arbitrary objects we cannot be sure 509 // aren't reported elsewhere. Deliberately omitting them from reporting. 510 } 511 512 return n; 513 } 514 515 size_t CacheIOThread::SizeOfIncludingThis( 516 mozilla::MallocSizeOf mallocSizeOf) const { 517 return mallocSizeOf(this) + SizeOfExcludingThis(mallocSizeOf); 518 } 519 520 CacheIOThread::Cancelable::Cancelable(bool aCancelable) 521 : mCancelable(aCancelable) { 522 // This will only ever be used on the I/O thread, 523 // which is expected to be alive longer than this class. 524 MOZ_ASSERT(CacheIOThread::sSelf); 525 MOZ_ASSERT(CacheIOThread::sSelf->IsCurrentThread()); 526 527 if (mCancelable) { 528 ++CacheIOThread::sSelf->mIOCancelableEvents; 529 } 530 } 531 532 CacheIOThread::Cancelable::~Cancelable() { 533 MOZ_ASSERT(CacheIOThread::sSelf); 534 535 if (mCancelable) { 536 --CacheIOThread::sSelf->mIOCancelableEvents; 537 } 538 } 539 540 } // namespace mozilla::net