tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

message_pump_kqueue.cc (13626B)


      1 // Copyright 2019 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/message_pump_kqueue.h"
      6 
      7 #include <sys/errno.h>
      8 
      9 #ifdef XP_IOS
     10 #  include <BrowserEngineCore/BEkevent.h>
     11 #endif
     12 
     13 #include "mozilla/AutoRestore.h"
     14 
     15 #include "base/logging.h"
     16 #include "base/scoped_nsautorelease_pool.h"
     17 #include "base/eintr_wrapper.h"
     18 
     19 namespace base {
     20 
     21 namespace {
     22 
     23 // On iOS, the normal `kevent64` method is blocked by the content process
     24 // sandbox, so instead we use `be_kevent64` from the BrowserEngineCore library.
     25 static int platform_kevent64(int fd, const kevent64_s* changelist, int nchanges,
     26                             kevent64_s* eventlist, int nevents, int flags) {
     27 #ifdef XP_IOS
     28  return be_kevent64(fd, changelist, nchanges, eventlist, nevents, flags);
     29 #else
     30  return kevent64(fd, changelist, nchanges, eventlist, nevents, flags, nullptr);
     31 #endif
     32 }
     33 
     34 int ChangeOneEvent(const mozilla::UniqueFileHandle& kqueue, kevent64_s* event) {
     35  return HANDLE_EINTR(platform_kevent64(kqueue.get(), event, 1, nullptr, 0, 0));
     36 }
     37 
     38 }  // namespace
     39 
     40 MessagePumpKqueue::FileDescriptorWatcher::FileDescriptorWatcher() = default;
     41 
     42 MessagePumpKqueue::FileDescriptorWatcher::~FileDescriptorWatcher() {
     43  StopWatchingFileDescriptor();
     44 }
     45 
     46 bool MessagePumpKqueue::FileDescriptorWatcher::StopWatchingFileDescriptor() {
     47  if (!pump_) return true;
     48  return pump_->StopWatchingFileDescriptor(this);
     49 }
     50 
     51 void MessagePumpKqueue::FileDescriptorWatcher::Init(MessagePumpKqueue* pump,
     52                                                    int fd, int mode,
     53                                                    Watcher* watcher) {
     54  DCHECK_NE(fd, -1);
     55  DCHECK(!watcher_);
     56  DCHECK(watcher);
     57  DCHECK(pump);
     58  fd_ = fd;
     59  mode_ = mode;
     60  watcher_ = watcher;
     61  pump_ = pump;
     62 }
     63 
     64 void MessagePumpKqueue::FileDescriptorWatcher::Reset() {
     65  fd_ = -1;
     66  mode_ = 0;
     67  watcher_ = nullptr;
     68  pump_ = nullptr;
     69 }
     70 
     71 MessagePumpKqueue::MachPortWatchController::MachPortWatchController() = default;
     72 
     73 MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() {
     74  StopWatchingMachPort();
     75 }
     76 
     77 bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() {
     78  if (!pump_) {
     79    return true;
     80  }
     81  return pump_->StopWatchingMachPort(this);
     82 }
     83 
     84 void MessagePumpKqueue::MachPortWatchController::Init(
     85    MessagePumpKqueue* pump, mach_port_t port, MachPortWatcher* watcher) {
     86  DCHECK(!watcher_);
     87  DCHECK(watcher);
     88  DCHECK(pump);
     89  port_ = port;
     90  watcher_ = watcher;
     91  pump_ = pump;
     92 }
     93 
     94 void MessagePumpKqueue::MachPortWatchController::Reset() {
     95  port_ = MACH_PORT_NULL;
     96  watcher_ = nullptr;
     97  pump_ = nullptr;
     98 }
     99 
    100 MessagePumpKqueue::MessagePumpKqueue() : kqueue_(kqueue()) {
    101  DCHECK(kqueue_) << "kqueue";
    102 
    103  // Create a Mach port that will be used to wake up the pump by sending
    104  // a message in response to ScheduleWork(). This is significantly faster than
    105  // using an EVFILT_USER event, especially when triggered across threads.
    106  mach_port_t wakeup;
    107  kern_return_t kr =
    108      mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &wakeup);
    109  wakeup_.reset(wakeup);
    110  CHECK(kr == KERN_SUCCESS)
    111  << "mach_port_allocate: " << mach_error_string(kr);
    112 
    113  // Specify the wakeup port event to directly receive the Mach message as part
    114  // of the kevent64() syscall.
    115  kevent64_s event{};
    116  event.ident = wakeup_.get();
    117  event.filter = EVFILT_MACHPORT;
    118  event.flags = EV_ADD;
    119  event.fflags = MACH_RCV_MSG;
    120  event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_);
    121  event.ext[1] = sizeof(wakeup_buffer_);
    122 
    123  int rv = ChangeOneEvent(kqueue_, &event);
    124  DCHECK(rv == 0) << "kevent64";
    125 }
    126 
    127 MessagePumpKqueue::~MessagePumpKqueue() = default;
    128 
    129 void MessagePumpKqueue::Run(Delegate* delegate) {
    130  mozilla::AutoRestore<bool> reset_keep_running(keep_running_);
    131  keep_running_ = true;
    132 
    133  while (keep_running_) {
    134    ScopedNSAutoreleasePool pool;
    135 
    136    bool do_more_work = DoInternalWork(delegate, nullptr);
    137    if (!keep_running_) break;
    138 
    139    do_more_work |= delegate->DoWork();
    140    if (!keep_running_) break;
    141 
    142    TimeTicks delayed_work_time;
    143    do_more_work |= delegate->DoDelayedWork(&delayed_work_time);
    144    if (!keep_running_) break;
    145 
    146    if (do_more_work) continue;
    147 
    148    do_more_work |= delegate->DoIdleWork();
    149    if (!keep_running_) break;
    150 
    151    if (do_more_work) continue;
    152 
    153    DoInternalWork(delegate, &delayed_work_time);
    154  }
    155 }
    156 
    157 void MessagePumpKqueue::Quit() {
    158  keep_running_ = false;
    159  ScheduleWork();
    160 }
    161 
    162 void MessagePumpKqueue::ScheduleWork() {
    163  mach_msg_empty_send_t message{};
    164  message.header.msgh_size = sizeof(message);
    165  message.header.msgh_bits =
    166      MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
    167  message.header.msgh_remote_port = wakeup_.get();
    168  kern_return_t kr = mach_msg_send(&message.header);
    169  if (kr != KERN_SUCCESS) {
    170    // If ScheduleWork() is being called by other threads faster than the pump
    171    // can dispatch work, the kernel message queue for the wakeup port can fill
    172    // up (this happens under base_perftests, for example). The kernel does
    173    // return a SEND_ONCE right in the case of failure, which must be destroyed
    174    // to avoid leaking.
    175    if ((kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER) {
    176      DLOG(ERROR) << "mach_msg_send: " << mach_error_string(kr);
    177    }
    178    mach_msg_destroy(&message.header);
    179  }
    180 }
    181 
    182 void MessagePumpKqueue::ScheduleDelayedWork(
    183    const TimeTicks& delayed_work_time) {
    184  // Nothing to do. This MessagePump uses DoWork().
    185 }
    186 
    187 bool MessagePumpKqueue::WatchMachReceivePort(
    188    mach_port_t port, MachPortWatchController* controller,
    189    MachPortWatcher* delegate) {
    190  DCHECK(port != MACH_PORT_NULL);
    191  DCHECK(controller);
    192  DCHECK(delegate);
    193 
    194  if (controller->port() != MACH_PORT_NULL) {
    195    DLOG(ERROR)
    196        << "Cannot use the same MachPortWatchController while it is active";
    197    return false;
    198  }
    199 
    200  kevent64_s event{};
    201  event.ident = port;
    202  event.filter = EVFILT_MACHPORT;
    203  event.flags = EV_ADD;
    204  int rv = ChangeOneEvent(kqueue_, &event);
    205  if (rv < 0) {
    206    DLOG(ERROR) << "kevent64";
    207    return false;
    208  }
    209  ++event_count_;
    210 
    211  controller->Init(this, port, delegate);
    212  port_controllers_.InsertOrUpdate(port, controller);
    213 
    214  return true;
    215 }
    216 
    217 bool MessagePumpKqueue::WatchFileDescriptor(int fd, bool persistent, int mode,
    218                                            FileDescriptorWatcher* controller,
    219                                            Watcher* delegate) {
    220  DCHECK_GE(fd, 0);
    221  DCHECK(controller);
    222  DCHECK(delegate);
    223  DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0);
    224 
    225  if (controller->fd() != -1 && controller->fd() != fd) {
    226    DLOG(ERROR)
    227        << "Cannot use the same FileDescriptorWatcher on two different FDs";
    228    return false;
    229  }
    230  StopWatchingFileDescriptor(controller);
    231 
    232  AutoTArray<kevent64_s, 2> events;
    233 
    234  kevent64_s base_event{};
    235  base_event.ident = static_cast<uint64_t>(fd);
    236  base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0);
    237 
    238  if (mode & Mode::WATCH_READ) {
    239    base_event.filter = EVFILT_READ;
    240    CHECK(next_fd_controller_id_ < std::numeric_limits<uint64_t>::max());
    241    base_event.udata = next_fd_controller_id_++;
    242    fd_controllers_.InsertOrUpdate(base_event.udata, controller);
    243    events.AppendElement(base_event);
    244  }
    245  if (mode & Mode::WATCH_WRITE) {
    246    base_event.filter = EVFILT_WRITE;
    247    CHECK(next_fd_controller_id_ < std::numeric_limits<uint64_t>::max());
    248    base_event.udata = next_fd_controller_id_++;
    249    fd_controllers_.InsertOrUpdate(base_event.udata, controller);
    250    events.AppendElement(base_event);
    251  }
    252 
    253  int rv = HANDLE_EINTR(platform_kevent64(kqueue_.get(), events.Elements(),
    254                                          events.Length(), nullptr, 0, 0));
    255  if (rv < 0) {
    256    DLOG(ERROR) << "WatchFileDescriptor kevent64";
    257    return false;
    258  }
    259 
    260  event_count_ += events.Length();
    261  controller->Init(this, fd, mode, delegate);
    262 
    263  return true;
    264 }
    265 
    266 bool MessagePumpKqueue::StopWatchingMachPort(
    267    MachPortWatchController* controller) {
    268  mach_port_t port = controller->port();
    269  controller->Reset();
    270  port_controllers_.Remove(port);
    271 
    272  kevent64_s event{};
    273  event.ident = port;
    274  event.filter = EVFILT_MACHPORT;
    275  event.flags = EV_DELETE;
    276  --event_count_;
    277  int rv = ChangeOneEvent(kqueue_, &event);
    278  if (rv < 0) {
    279    DLOG(ERROR) << "kevent64";
    280    return false;
    281  }
    282 
    283  return true;
    284 }
    285 
    286 bool MessagePumpKqueue::StopWatchingFileDescriptor(
    287    FileDescriptorWatcher* controller) {
    288  int fd = controller->fd();
    289  int mode = controller->mode();
    290  controller->Reset();
    291 
    292  if (fd < 0) return true;
    293 
    294  AutoTArray<kevent64_s, 2> events;
    295 
    296  kevent64_s base_event{};
    297  base_event.ident = static_cast<uint64_t>(fd);
    298  base_event.flags = EV_DELETE;
    299 
    300  if (mode & Mode::WATCH_READ) {
    301    base_event.filter = EVFILT_READ;
    302    events.AppendElement(base_event);
    303  }
    304  if (mode & Mode::WATCH_WRITE) {
    305    base_event.filter = EVFILT_WRITE;
    306    events.AppendElement(base_event);
    307  }
    308 
    309  int rv = HANDLE_EINTR(platform_kevent64(kqueue_.get(), events.Elements(),
    310                                          events.Length(), nullptr, 0, 0));
    311  if (rv < 0) DLOG(ERROR) << "StopWatchingFileDescriptor kevent64";
    312 
    313  // The keys for the IDMap aren't recorded anywhere (they're attached to the
    314  // kevent object in the kernel), so locate the entries by controller pointer.
    315  fd_controllers_.RemoveIf([&](auto& it) { return it.Data() == controller; });
    316 
    317  event_count_ -= events.Length();
    318 
    319  return rv >= 0;
    320 }
    321 
    322 bool MessagePumpKqueue::DoInternalWork(Delegate* delegate,
    323                                       TimeTicks* delayed_work_time) {
    324  if (events_.size() < event_count_) {
    325    events_.resize(event_count_);
    326  }
    327 
    328  bool poll = delayed_work_time == nullptr;
    329  int flags = poll ? KEVENT_FLAG_IMMEDIATE : 0;
    330  if (!poll && delayed_work_time_ != *delayed_work_time) {
    331    UpdateWakeupTimer(*delayed_work_time);
    332    DCHECK(delayed_work_time_ == *delayed_work_time);
    333  }
    334 
    335  int rv = HANDLE_EINTR(platform_kevent64(
    336      kqueue_.get(), nullptr, 0, events_.data(), events_.size(), flags));
    337 
    338  CHECK(rv >= 0)
    339  << "kevent64";
    340  return ProcessEvents(delegate, rv);
    341 }
    342 
    343 bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, int count) {
    344  bool did_work = false;
    345 
    346  for (int i = 0; i < count; ++i) {
    347    auto* event = &events_[i];
    348    if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) {
    349      did_work = true;
    350 
    351      FileDescriptorWatcher* controller = fd_controllers_.Get(event->udata);
    352      if (!controller) {
    353        // The controller was removed by some other work callout before
    354        // this event could be processed.
    355        continue;
    356      }
    357      Watcher* fd_watcher = controller->watcher();
    358 
    359      if (event->flags & EV_ONESHOT) {
    360        // If this was a one-shot event, the Controller needs to stop tracking
    361        // the descriptor, so it is not double-removed when it is told to stop
    362        // watching.
    363        controller->Reset();
    364        fd_controllers_.Remove(event->udata);
    365        --event_count_;
    366      }
    367 
    368      if (fd_watcher) {
    369        if (event->filter == EVFILT_READ) {
    370          fd_watcher->OnFileCanReadWithoutBlocking(
    371              static_cast<int>(event->ident));
    372        } else if (event->filter == EVFILT_WRITE) {
    373          fd_watcher->OnFileCanWriteWithoutBlocking(
    374              static_cast<int>(event->ident));
    375        }
    376      }
    377    } else if (event->filter == EVFILT_MACHPORT) {
    378      mach_port_t port = event->ident;
    379 
    380      if (port == wakeup_.get()) {
    381        // The wakeup event has been received, do not treat this as "doing
    382        // work", this just wakes up the pump.
    383        continue;
    384      }
    385 
    386      did_work = true;
    387 
    388      MachPortWatchController* controller = port_controllers_.Get(port);
    389      // The controller could have been removed by some other work callout
    390      // before this event could be processed.
    391      if (controller) {
    392        controller->watcher()->OnMachMessageReceived(port);
    393      }
    394    } else if (event->filter == EVFILT_TIMER) {
    395      // The wakeup timer fired.
    396      DCHECK(!delayed_work_time_.is_null());
    397      delayed_work_time_ = base::TimeTicks();
    398      --event_count_;
    399    } else {
    400      NOTREACHED() << "Unexpected event for filter " << event->filter;
    401    }
    402  }
    403 
    404  return did_work;
    405 }
    406 
    407 void MessagePumpKqueue::UpdateWakeupTimer(const base::TimeTicks& wakeup_time) {
    408  DCHECK_NE(wakeup_time, delayed_work_time_);
    409 
    410  // The ident of the wakeup timer. There's only the one timer as the pair
    411  // (ident, filter) is the identity of the event.
    412  constexpr uint64_t kWakeupTimerIdent = 0x0;
    413  if (wakeup_time.is_null()) {
    414    // Clear the timer.
    415    kevent64_s timer{};
    416    timer.ident = kWakeupTimerIdent;
    417    timer.filter = EVFILT_TIMER;
    418    timer.flags = EV_DELETE;
    419 
    420    int rv = ChangeOneEvent(kqueue_, &timer);
    421    CHECK(rv == 0)
    422    << "kevent64, delete timer";
    423    --event_count_;
    424  } else {
    425    // Set/reset the timer.
    426    kevent64_s timer{};
    427    timer.ident = kWakeupTimerIdent;
    428    timer.filter = EVFILT_TIMER;
    429    // This updates the timer if it already exists in |kqueue_|.
    430    timer.flags = EV_ADD | EV_ONESHOT;
    431 
    432    // Specify the sleep in microseconds to avoid undersleeping due to
    433    // numeric problems.
    434    // If wakeup_time is in the past, the delta below will be negative and the
    435    // timer is set immediately.
    436    timer.fflags = NOTE_USECONDS;
    437    timer.data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds();
    438 
    439    int rv = ChangeOneEvent(kqueue_, &timer);
    440    CHECK(rv == 0)
    441    << "kevent64, set timer";
    442 
    443    // Bump the event count if we just added the timer.
    444    if (delayed_work_time_.is_null()) ++event_count_;
    445  }
    446 
    447  delayed_work_time_ = wakeup_time;
    448 }
    449 
    450 }  // namespace base