message_pump_kqueue.cc (13626B)
1 // Copyright 2019 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/message_pump_kqueue.h" 6 7 #include <sys/errno.h> 8 9 #ifdef XP_IOS 10 # include <BrowserEngineCore/BEkevent.h> 11 #endif 12 13 #include "mozilla/AutoRestore.h" 14 15 #include "base/logging.h" 16 #include "base/scoped_nsautorelease_pool.h" 17 #include "base/eintr_wrapper.h" 18 19 namespace base { 20 21 namespace { 22 23 // On iOS, the normal `kevent64` method is blocked by the content process 24 // sandbox, so instead we use `be_kevent64` from the BrowserEngineCore library. 25 static int platform_kevent64(int fd, const kevent64_s* changelist, int nchanges, 26 kevent64_s* eventlist, int nevents, int flags) { 27 #ifdef XP_IOS 28 return be_kevent64(fd, changelist, nchanges, eventlist, nevents, flags); 29 #else 30 return kevent64(fd, changelist, nchanges, eventlist, nevents, flags, nullptr); 31 #endif 32 } 33 34 int ChangeOneEvent(const mozilla::UniqueFileHandle& kqueue, kevent64_s* event) { 35 return HANDLE_EINTR(platform_kevent64(kqueue.get(), event, 1, nullptr, 0, 0)); 36 } 37 38 } // namespace 39 40 MessagePumpKqueue::FileDescriptorWatcher::FileDescriptorWatcher() = default; 41 42 MessagePumpKqueue::FileDescriptorWatcher::~FileDescriptorWatcher() { 43 StopWatchingFileDescriptor(); 44 } 45 46 bool MessagePumpKqueue::FileDescriptorWatcher::StopWatchingFileDescriptor() { 47 if (!pump_) return true; 48 return pump_->StopWatchingFileDescriptor(this); 49 } 50 51 void MessagePumpKqueue::FileDescriptorWatcher::Init(MessagePumpKqueue* pump, 52 int fd, int mode, 53 Watcher* watcher) { 54 DCHECK_NE(fd, -1); 55 DCHECK(!watcher_); 56 DCHECK(watcher); 57 DCHECK(pump); 58 fd_ = fd; 59 mode_ = mode; 60 watcher_ = watcher; 61 pump_ = pump; 62 } 63 64 void MessagePumpKqueue::FileDescriptorWatcher::Reset() { 65 fd_ = -1; 66 mode_ = 0; 67 watcher_ = nullptr; 68 pump_ = nullptr; 69 } 70 71 MessagePumpKqueue::MachPortWatchController::MachPortWatchController() = default; 72 73 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() { 74 StopWatchingMachPort(); 75 } 76 77 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() { 78 if (!pump_) { 79 return true; 80 } 81 return pump_->StopWatchingMachPort(this); 82 } 83 84 void MessagePumpKqueue::MachPortWatchController::Init( 85 MessagePumpKqueue* pump, mach_port_t port, MachPortWatcher* watcher) { 86 DCHECK(!watcher_); 87 DCHECK(watcher); 88 DCHECK(pump); 89 port_ = port; 90 watcher_ = watcher; 91 pump_ = pump; 92 } 93 94 void MessagePumpKqueue::MachPortWatchController::Reset() { 95 port_ = MACH_PORT_NULL; 96 watcher_ = nullptr; 97 pump_ = nullptr; 98 } 99 100 MessagePumpKqueue::MessagePumpKqueue() : kqueue_(kqueue()) { 101 DCHECK(kqueue_) << "kqueue"; 102 103 // Create a Mach port that will be used to wake up the pump by sending 104 // a message in response to ScheduleWork(). This is significantly faster than 105 // using an EVFILT_USER event, especially when triggered across threads. 106 mach_port_t wakeup; 107 kern_return_t kr = 108 mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &wakeup); 109 wakeup_.reset(wakeup); 110 CHECK(kr == KERN_SUCCESS) 111 << "mach_port_allocate: " << mach_error_string(kr); 112 113 // Specify the wakeup port event to directly receive the Mach message as part 114 // of the kevent64() syscall. 115 kevent64_s event{}; 116 event.ident = wakeup_.get(); 117 event.filter = EVFILT_MACHPORT; 118 event.flags = EV_ADD; 119 event.fflags = MACH_RCV_MSG; 120 event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_); 121 event.ext[1] = sizeof(wakeup_buffer_); 122 123 int rv = ChangeOneEvent(kqueue_, &event); 124 DCHECK(rv == 0) << "kevent64"; 125 } 126 127 MessagePumpKqueue::~MessagePumpKqueue() = default; 128 129 void MessagePumpKqueue::Run(Delegate* delegate) { 130 mozilla::AutoRestore<bool> reset_keep_running(keep_running_); 131 keep_running_ = true; 132 133 while (keep_running_) { 134 ScopedNSAutoreleasePool pool; 135 136 bool do_more_work = DoInternalWork(delegate, nullptr); 137 if (!keep_running_) break; 138 139 do_more_work |= delegate->DoWork(); 140 if (!keep_running_) break; 141 142 TimeTicks delayed_work_time; 143 do_more_work |= delegate->DoDelayedWork(&delayed_work_time); 144 if (!keep_running_) break; 145 146 if (do_more_work) continue; 147 148 do_more_work |= delegate->DoIdleWork(); 149 if (!keep_running_) break; 150 151 if (do_more_work) continue; 152 153 DoInternalWork(delegate, &delayed_work_time); 154 } 155 } 156 157 void MessagePumpKqueue::Quit() { 158 keep_running_ = false; 159 ScheduleWork(); 160 } 161 162 void MessagePumpKqueue::ScheduleWork() { 163 mach_msg_empty_send_t message{}; 164 message.header.msgh_size = sizeof(message); 165 message.header.msgh_bits = 166 MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); 167 message.header.msgh_remote_port = wakeup_.get(); 168 kern_return_t kr = mach_msg_send(&message.header); 169 if (kr != KERN_SUCCESS) { 170 // If ScheduleWork() is being called by other threads faster than the pump 171 // can dispatch work, the kernel message queue for the wakeup port can fill 172 // up (this happens under base_perftests, for example). The kernel does 173 // return a SEND_ONCE right in the case of failure, which must be destroyed 174 // to avoid leaking. 175 if ((kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER) { 176 DLOG(ERROR) << "mach_msg_send: " << mach_error_string(kr); 177 } 178 mach_msg_destroy(&message.header); 179 } 180 } 181 182 void MessagePumpKqueue::ScheduleDelayedWork( 183 const TimeTicks& delayed_work_time) { 184 // Nothing to do. This MessagePump uses DoWork(). 185 } 186 187 bool MessagePumpKqueue::WatchMachReceivePort( 188 mach_port_t port, MachPortWatchController* controller, 189 MachPortWatcher* delegate) { 190 DCHECK(port != MACH_PORT_NULL); 191 DCHECK(controller); 192 DCHECK(delegate); 193 194 if (controller->port() != MACH_PORT_NULL) { 195 DLOG(ERROR) 196 << "Cannot use the same MachPortWatchController while it is active"; 197 return false; 198 } 199 200 kevent64_s event{}; 201 event.ident = port; 202 event.filter = EVFILT_MACHPORT; 203 event.flags = EV_ADD; 204 int rv = ChangeOneEvent(kqueue_, &event); 205 if (rv < 0) { 206 DLOG(ERROR) << "kevent64"; 207 return false; 208 } 209 ++event_count_; 210 211 controller->Init(this, port, delegate); 212 port_controllers_.InsertOrUpdate(port, controller); 213 214 return true; 215 } 216 217 bool MessagePumpKqueue::WatchFileDescriptor(int fd, bool persistent, int mode, 218 FileDescriptorWatcher* controller, 219 Watcher* delegate) { 220 DCHECK_GE(fd, 0); 221 DCHECK(controller); 222 DCHECK(delegate); 223 DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0); 224 225 if (controller->fd() != -1 && controller->fd() != fd) { 226 DLOG(ERROR) 227 << "Cannot use the same FileDescriptorWatcher on two different FDs"; 228 return false; 229 } 230 StopWatchingFileDescriptor(controller); 231 232 AutoTArray<kevent64_s, 2> events; 233 234 kevent64_s base_event{}; 235 base_event.ident = static_cast<uint64_t>(fd); 236 base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0); 237 238 if (mode & Mode::WATCH_READ) { 239 base_event.filter = EVFILT_READ; 240 CHECK(next_fd_controller_id_ < std::numeric_limits<uint64_t>::max()); 241 base_event.udata = next_fd_controller_id_++; 242 fd_controllers_.InsertOrUpdate(base_event.udata, controller); 243 events.AppendElement(base_event); 244 } 245 if (mode & Mode::WATCH_WRITE) { 246 base_event.filter = EVFILT_WRITE; 247 CHECK(next_fd_controller_id_ < std::numeric_limits<uint64_t>::max()); 248 base_event.udata = next_fd_controller_id_++; 249 fd_controllers_.InsertOrUpdate(base_event.udata, controller); 250 events.AppendElement(base_event); 251 } 252 253 int rv = HANDLE_EINTR(platform_kevent64(kqueue_.get(), events.Elements(), 254 events.Length(), nullptr, 0, 0)); 255 if (rv < 0) { 256 DLOG(ERROR) << "WatchFileDescriptor kevent64"; 257 return false; 258 } 259 260 event_count_ += events.Length(); 261 controller->Init(this, fd, mode, delegate); 262 263 return true; 264 } 265 266 bool MessagePumpKqueue::StopWatchingMachPort( 267 MachPortWatchController* controller) { 268 mach_port_t port = controller->port(); 269 controller->Reset(); 270 port_controllers_.Remove(port); 271 272 kevent64_s event{}; 273 event.ident = port; 274 event.filter = EVFILT_MACHPORT; 275 event.flags = EV_DELETE; 276 --event_count_; 277 int rv = ChangeOneEvent(kqueue_, &event); 278 if (rv < 0) { 279 DLOG(ERROR) << "kevent64"; 280 return false; 281 } 282 283 return true; 284 } 285 286 bool MessagePumpKqueue::StopWatchingFileDescriptor( 287 FileDescriptorWatcher* controller) { 288 int fd = controller->fd(); 289 int mode = controller->mode(); 290 controller->Reset(); 291 292 if (fd < 0) return true; 293 294 AutoTArray<kevent64_s, 2> events; 295 296 kevent64_s base_event{}; 297 base_event.ident = static_cast<uint64_t>(fd); 298 base_event.flags = EV_DELETE; 299 300 if (mode & Mode::WATCH_READ) { 301 base_event.filter = EVFILT_READ; 302 events.AppendElement(base_event); 303 } 304 if (mode & Mode::WATCH_WRITE) { 305 base_event.filter = EVFILT_WRITE; 306 events.AppendElement(base_event); 307 } 308 309 int rv = HANDLE_EINTR(platform_kevent64(kqueue_.get(), events.Elements(), 310 events.Length(), nullptr, 0, 0)); 311 if (rv < 0) DLOG(ERROR) << "StopWatchingFileDescriptor kevent64"; 312 313 // The keys for the IDMap aren't recorded anywhere (they're attached to the 314 // kevent object in the kernel), so locate the entries by controller pointer. 315 fd_controllers_.RemoveIf([&](auto& it) { return it.Data() == controller; }); 316 317 event_count_ -= events.Length(); 318 319 return rv >= 0; 320 } 321 322 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate, 323 TimeTicks* delayed_work_time) { 324 if (events_.size() < event_count_) { 325 events_.resize(event_count_); 326 } 327 328 bool poll = delayed_work_time == nullptr; 329 int flags = poll ? KEVENT_FLAG_IMMEDIATE : 0; 330 if (!poll && delayed_work_time_ != *delayed_work_time) { 331 UpdateWakeupTimer(*delayed_work_time); 332 DCHECK(delayed_work_time_ == *delayed_work_time); 333 } 334 335 int rv = HANDLE_EINTR(platform_kevent64( 336 kqueue_.get(), nullptr, 0, events_.data(), events_.size(), flags)); 337 338 CHECK(rv >= 0) 339 << "kevent64"; 340 return ProcessEvents(delegate, rv); 341 } 342 343 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, int count) { 344 bool did_work = false; 345 346 for (int i = 0; i < count; ++i) { 347 auto* event = &events_[i]; 348 if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) { 349 did_work = true; 350 351 FileDescriptorWatcher* controller = fd_controllers_.Get(event->udata); 352 if (!controller) { 353 // The controller was removed by some other work callout before 354 // this event could be processed. 355 continue; 356 } 357 Watcher* fd_watcher = controller->watcher(); 358 359 if (event->flags & EV_ONESHOT) { 360 // If this was a one-shot event, the Controller needs to stop tracking 361 // the descriptor, so it is not double-removed when it is told to stop 362 // watching. 363 controller->Reset(); 364 fd_controllers_.Remove(event->udata); 365 --event_count_; 366 } 367 368 if (fd_watcher) { 369 if (event->filter == EVFILT_READ) { 370 fd_watcher->OnFileCanReadWithoutBlocking( 371 static_cast<int>(event->ident)); 372 } else if (event->filter == EVFILT_WRITE) { 373 fd_watcher->OnFileCanWriteWithoutBlocking( 374 static_cast<int>(event->ident)); 375 } 376 } 377 } else if (event->filter == EVFILT_MACHPORT) { 378 mach_port_t port = event->ident; 379 380 if (port == wakeup_.get()) { 381 // The wakeup event has been received, do not treat this as "doing 382 // work", this just wakes up the pump. 383 continue; 384 } 385 386 did_work = true; 387 388 MachPortWatchController* controller = port_controllers_.Get(port); 389 // The controller could have been removed by some other work callout 390 // before this event could be processed. 391 if (controller) { 392 controller->watcher()->OnMachMessageReceived(port); 393 } 394 } else if (event->filter == EVFILT_TIMER) { 395 // The wakeup timer fired. 396 DCHECK(!delayed_work_time_.is_null()); 397 delayed_work_time_ = base::TimeTicks(); 398 --event_count_; 399 } else { 400 NOTREACHED() << "Unexpected event for filter " << event->filter; 401 } 402 } 403 404 return did_work; 405 } 406 407 void MessagePumpKqueue::UpdateWakeupTimer(const base::TimeTicks& wakeup_time) { 408 DCHECK_NE(wakeup_time, delayed_work_time_); 409 410 // The ident of the wakeup timer. There's only the one timer as the pair 411 // (ident, filter) is the identity of the event. 412 constexpr uint64_t kWakeupTimerIdent = 0x0; 413 if (wakeup_time.is_null()) { 414 // Clear the timer. 415 kevent64_s timer{}; 416 timer.ident = kWakeupTimerIdent; 417 timer.filter = EVFILT_TIMER; 418 timer.flags = EV_DELETE; 419 420 int rv = ChangeOneEvent(kqueue_, &timer); 421 CHECK(rv == 0) 422 << "kevent64, delete timer"; 423 --event_count_; 424 } else { 425 // Set/reset the timer. 426 kevent64_s timer{}; 427 timer.ident = kWakeupTimerIdent; 428 timer.filter = EVFILT_TIMER; 429 // This updates the timer if it already exists in |kqueue_|. 430 timer.flags = EV_ADD | EV_ONESHOT; 431 432 // Specify the sleep in microseconds to avoid undersleeping due to 433 // numeric problems. 434 // If wakeup_time is in the past, the delta below will be negative and the 435 // timer is set immediately. 436 timer.fflags = NOTE_USECONDS; 437 timer.data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds(); 438 439 int rv = ChangeOneEvent(kqueue_, &timer); 440 CHECK(rv == 0) 441 << "kevent64, set timer"; 442 443 // Bump the event count if we just added the timer. 444 if (delayed_work_time_.is_null()) ++event_count_; 445 } 446 447 delayed_work_time_ = wakeup_time; 448 } 449 450 } // namespace base