message_pump_libevent.cc (10578B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 // Copyright (c) 2008 The Chromium Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE file. 6 7 #include "base/message_pump_libevent.h" 8 9 #include <errno.h> 10 #include <fcntl.h> 11 #if defined(ANDROID) || defined(XP_UNIX) 12 # include <unistd.h> 13 #endif 14 15 #include "eintr_wrapper.h" 16 #include "base/logging.h" 17 #include "base/scoped_nsautorelease_pool.h" 18 #include "base/time.h" 19 #include "event.h" 20 #include "mozilla/ProfilerLabels.h" 21 #include "mozilla/ProfilerThreadSleep.h" 22 #include "mozilla/UniquePtr.h" 23 24 // This macro checks that the _EVENT_SIZEOF_* constants defined in 25 // ipc/chromiume/src/third_party/<platform>/event2/event-config.h are correct. 26 #if defined(_EVENT_SIZEOF_SHORT) 27 # define CHECK_EVENT_SIZEOF(TYPE, type) \ 28 static_assert(_EVENT_SIZEOF_##TYPE == sizeof(type), \ 29 "bad _EVENT_SIZEOF_" #TYPE); 30 #elif defined(EVENT__SIZEOF_SHORT) 31 # define CHECK_EVENT_SIZEOF(TYPE, type) \ 32 static_assert(EVENT__SIZEOF_##TYPE == sizeof(type), \ 33 "bad EVENT__SIZEOF_" #TYPE); 34 #else 35 # error Cannot find libevent type sizes 36 #endif 37 38 CHECK_EVENT_SIZEOF(LONG, long); 39 CHECK_EVENT_SIZEOF(LONG_LONG, long long); 40 CHECK_EVENT_SIZEOF(OFF_T, ev_off_t); 41 CHECK_EVENT_SIZEOF(PTHREAD_T, pthread_t); 42 CHECK_EVENT_SIZEOF(SHORT, short); 43 CHECK_EVENT_SIZEOF(SIZE_T, size_t); 44 CHECK_EVENT_SIZEOF(TIME_T, time_t); 45 CHECK_EVENT_SIZEOF(VOID_P, void*); 46 47 // Lifecycle of struct event 48 // Libevent uses two main data structures: 49 // struct event_base (of which there is one per message pump), and 50 // struct event (of which there is roughly one per socket). 51 // The socket's struct event is created in 52 // MessagePumpLibevent::WatchFileDescriptor(), 53 // is owned by the FileDescriptorWatcher, and is destroyed in 54 // StopWatchingFileDescriptor(). 55 // It is moved into and out of lists in struct event_base by 56 // the libevent functions event_add() and event_del(). 57 // 58 // TODO(dkegel): 59 // At the moment bad things happen if a FileDescriptorWatcher 60 // is active after its MessagePumpLibevent has been destroyed. 61 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop 62 // Not clear yet whether that situation occurs in practice, 63 // but if it does, we need to fix it. 64 65 namespace base { 66 67 // Return 0 on success 68 // Too small a function to bother putting in a library? 69 static int SetNonBlocking(int fd) { 70 int flags = fcntl(fd, F_GETFL, 0); 71 if (flags == -1) flags = 0; 72 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); 73 } 74 75 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() 76 : is_persistent_(false), event_(NULL) {} 77 78 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { 79 if (event_) { 80 StopWatchingFileDescriptor(); 81 } 82 } 83 84 void MessagePumpLibevent::FileDescriptorWatcher::Init(event* e, 85 bool is_persistent) { 86 DCHECK(e); 87 DCHECK(event_ == NULL); 88 89 is_persistent_ = is_persistent; 90 event_ = e; 91 } 92 93 event* MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { 94 struct event* e = event_; 95 event_ = NULL; 96 return e; 97 } 98 99 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { 100 event* e = ReleaseEvent(); 101 if (e == NULL) return true; 102 103 // event_del() is a no-op if the event isn't active. 104 int rv = event_del(e); 105 delete e; 106 return (rv == 0); 107 } 108 109 bool MessagePumpLibevent::awake_ = false; 110 111 // Called if a byte is received on the wakeup pipe. 112 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { 113 if (!awake_) { 114 profiler_thread_wake(); 115 awake_ = true; 116 } 117 118 AUTO_PROFILER_LABEL("MessagePumpLibevent::OnWakeup", OTHER); 119 120 base::MessagePumpLibevent* that = 121 static_cast<base::MessagePumpLibevent*>(context); 122 DCHECK(that->wakeup_pipe_out_ == socket); 123 124 // Remove and discard the wakeup byte. 125 char buf; 126 int nread = HANDLE_EINTR(read(socket, &buf, 1)); 127 DCHECK_EQ(nread, 1); 128 // Tell libevent to break out of inner loop. 129 event_base_loopbreak(that->event_base_); 130 } 131 132 MessagePumpLibevent::MessagePumpLibevent() 133 : keep_running_(true), 134 in_run_(false), 135 event_base_(event_base_new()), 136 wakeup_pipe_in_(-1), 137 wakeup_pipe_out_(-1) { 138 if (!Init()) NOTREACHED(); 139 } 140 141 bool MessagePumpLibevent::Init() { 142 int fds[2]; 143 if (pipe(fds)) { 144 DLOG(ERROR) << "pipe() failed, errno: " << errno; 145 return false; 146 } 147 if (SetNonBlocking(fds[0])) { 148 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; 149 return false; 150 } 151 if (SetNonBlocking(fds[1])) { 152 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; 153 return false; 154 } 155 wakeup_pipe_out_ = fds[0]; 156 wakeup_pipe_in_ = fds[1]; 157 158 wakeup_event_ = new event; 159 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, OnWakeup, 160 this); 161 event_base_set(event_base_, wakeup_event_); 162 163 if (event_add(wakeup_event_, 0)) return false; 164 return true; 165 } 166 167 MessagePumpLibevent::~MessagePumpLibevent() { 168 DCHECK(wakeup_event_); 169 DCHECK(event_base_); 170 event_del(wakeup_event_); 171 delete wakeup_event_; 172 if (wakeup_pipe_in_ >= 0) close(wakeup_pipe_in_); 173 if (wakeup_pipe_out_ >= 0) close(wakeup_pipe_out_); 174 event_base_free(event_base_); 175 } 176 177 bool MessagePumpLibevent::WatchFileDescriptor(int fd, bool persistent, 178 Mode mode, 179 FileDescriptorWatcher* controller, 180 Watcher* delegate) { 181 DCHECK(fd > 0); 182 DCHECK(controller); 183 DCHECK(delegate); 184 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); 185 186 int event_mask = persistent ? EV_PERSIST : 0; 187 if ((mode & WATCH_READ) != 0) { 188 event_mask |= EV_READ; 189 } 190 if ((mode & WATCH_WRITE) != 0) { 191 event_mask |= EV_WRITE; 192 } 193 194 // |should_delete_event| is true if we're modifying an event that's currently 195 // active in |controller|. 196 // If we're modifying an existing event and there's an error then we need to 197 // tell libevent to clean it up via event_delete() before returning. 198 bool should_delete_event = true; 199 mozilla::UniquePtr<event> evt(controller->ReleaseEvent()); 200 if (evt.get() == NULL) { 201 should_delete_event = false; 202 // Ownership is transferred to the controller. 203 evt = mozilla::MakeUnique<event>(); 204 } else { 205 // It's illegal to use this function to listen on 2 separate fds with the 206 // same |controller|. 207 if (EVENT_FD(evt.get()) != fd) { 208 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; 209 return false; 210 } 211 212 // Make sure we don't pick up any funky internal libevent masks. 213 int old_interest_mask = 214 evt.get()->ev_events & (EV_READ | EV_WRITE | EV_PERSIST); 215 216 // Combine old/new event masks. 217 event_mask |= old_interest_mask; 218 219 // Must disarm the event before we can reuse it. 220 event_del(evt.get()); 221 } 222 223 // Set current interest mask and message pump for this event. 224 event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate); 225 226 // Tell libevent which message pump this socket will belong to when we add it. 227 if (event_base_set(event_base_, evt.get()) != 0) { 228 if (should_delete_event) { 229 event_del(evt.get()); 230 } 231 return false; 232 } 233 234 // Add this socket to the list of monitored sockets. 235 if (event_add(evt.get(), NULL) != 0) { 236 if (should_delete_event) { 237 event_del(evt.get()); 238 } 239 return false; 240 } 241 242 // Transfer ownership of evt to controller. 243 controller->Init(evt.release(), persistent); 244 return true; 245 } 246 247 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, 248 void* context) { 249 if (!awake_) { 250 profiler_thread_wake(); 251 awake_ = true; 252 } 253 AUTO_PROFILER_LABEL("MessagePumpLibevent::OnLibeventNotification", OTHER); 254 255 Watcher* watcher = static_cast<Watcher*>(context); 256 257 if (flags & EV_WRITE) { 258 watcher->OnFileCanWriteWithoutBlocking(fd); 259 } 260 if (flags & EV_READ) { 261 watcher->OnFileCanReadWithoutBlocking(fd); 262 } 263 } 264 265 // Reentrant! 266 void MessagePumpLibevent::Run(Delegate* delegate) { 267 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; 268 269 bool old_in_run = in_run_; 270 in_run_ = true; 271 272 for (;;) { 273 ScopedNSAutoreleasePool autorelease_pool; 274 275 bool did_work = delegate->DoWork(); 276 if (!keep_running_) break; 277 278 did_work |= delegate->DoDelayedWork(&delayed_work_time_); 279 if (!keep_running_) break; 280 281 if (did_work) continue; 282 283 did_work = delegate->DoIdleWork(); 284 if (!keep_running_) break; 285 286 if (did_work) continue; 287 288 // EVLOOP_ONCE tells libevent to only block once, 289 // but to service all pending events when it wakes up. 290 AUTO_PROFILER_LABEL("MessagePumpLibevent::Run::Wait", IDLE); 291 if (delayed_work_time_.is_null()) { 292 profiler_thread_sleep(); 293 awake_ = false; 294 event_base_loop(event_base_, EVLOOP_ONCE); 295 } else { 296 TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); 297 if (delay > TimeDelta()) { 298 struct timeval poll_tv; 299 poll_tv.tv_sec = delay.InSeconds(); 300 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; 301 event_base_loopexit(event_base_, &poll_tv); 302 profiler_thread_sleep(); 303 awake_ = false; 304 event_base_loop(event_base_, EVLOOP_ONCE); 305 } else { 306 // It looks like delayed_work_time_ indicates a time in the past, so we 307 // need to call DoDelayedWork now. 308 delayed_work_time_ = TimeTicks(); 309 } 310 } 311 } 312 313 keep_running_ = true; 314 in_run_ = old_in_run; 315 } 316 317 void MessagePumpLibevent::Quit() { 318 DCHECK(in_run_); 319 // Tell both libevent and Run that they should break out of their loops. 320 keep_running_ = false; 321 ScheduleWork(); 322 } 323 324 void MessagePumpLibevent::ScheduleWork() { 325 // Tell libevent (in a threadsafe way) that it should break out of its loop. 326 char buf = 0; 327 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); 328 DCHECK(nwrite == 1 || errno == EAGAIN) 329 << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; 330 } 331 332 void MessagePumpLibevent::ScheduleDelayedWork( 333 const TimeTicks& delayed_work_time) { 334 // We know that we can't be blocked on Wait right now since this method can 335 // only be called on the same thread as Run, so we only need to update our 336 // record of how long to sleep when we do sleep. 337 delayed_work_time_ = delayed_work_time; 338 } 339 340 } // namespace base