ipc/chromium/src/base/message_pump_libevent.cc

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved.
michael@0 2 // Use of this source code is governed by a BSD-style license that can be
michael@0 3 // found in the LICENSE file.
michael@0 4
michael@0 5 #include "base/message_pump_libevent.h"
michael@0 6
michael@0 7 #include <errno.h>
michael@0 8 #include <fcntl.h>
michael@0 9 #if defined(ANDROID) || defined(OS_POSIX)
michael@0 10 #include <unistd.h>
michael@0 11 #endif
michael@0 12
michael@0 13 #include "eintr_wrapper.h"
michael@0 14 #include "base/logging.h"
michael@0 15 #include "base/scoped_nsautorelease_pool.h"
michael@0 16 #include "base/scoped_ptr.h"
michael@0 17 #include "base/time.h"
michael@0 18 #include "nsDependentSubstring.h"
michael@0 19 #include "third_party/libevent/event.h"
michael@0 20
michael@0 21 // Lifecycle of struct event
michael@0 22 // Libevent uses two main data structures:
michael@0 23 // struct event_base (of which there is one per message pump), and
michael@0 24 // struct event (of which there is roughly one per socket).
michael@0 25 // The socket's struct event is created in
michael@0 26 // MessagePumpLibevent::WatchFileDescriptor(),
michael@0 27 // is owned by the FileDescriptorWatcher, and is destroyed in
michael@0 28 // StopWatchingFileDescriptor().
michael@0 29 // It is moved into and out of lists in struct event_base by
michael@0 30 // the libevent functions event_add() and event_del().
michael@0 31 //
michael@0 32 // TODO(dkegel):
michael@0 33 // At the moment bad things happen if a FileDescriptorWatcher
michael@0 34 // is active after its MessagePumpLibevent has been destroyed.
michael@0 35 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
michael@0 36 // Not clear yet whether that situation occurs in practice,
michael@0 37 // but if it does, we need to fix it.
michael@0 38
michael@0 39 namespace base {
michael@0 40
michael@0 41 // Return 0 on success
michael@0 42 // Too small a function to bother putting in a library?
michael@0 43 static int SetNonBlocking(int fd) {
michael@0 44 int flags = fcntl(fd, F_GETFL, 0);
michael@0 45 if (flags == -1)
michael@0 46 flags = 0;
michael@0 47 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
michael@0 48 }
michael@0 49
michael@0 50 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
michael@0 51 : is_persistent_(false),
michael@0 52 event_(NULL) {
michael@0 53 }
michael@0 54
michael@0 55 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
michael@0 56 if (event_) {
michael@0 57 StopWatchingFileDescriptor();
michael@0 58 }
michael@0 59 }
michael@0 60
michael@0 61 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
michael@0 62 bool is_persistent) {
michael@0 63 DCHECK(e);
michael@0 64 DCHECK(event_ == NULL);
michael@0 65
michael@0 66 is_persistent_ = is_persistent;
michael@0 67 event_ = e;
michael@0 68 }
michael@0 69
michael@0 70 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
michael@0 71 struct event *e = event_;
michael@0 72 event_ = NULL;
michael@0 73 return e;
michael@0 74 }
michael@0 75
michael@0 76 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
michael@0 77 event* e = ReleaseEvent();
michael@0 78 if (e == NULL)
michael@0 79 return true;
michael@0 80
michael@0 81 // event_del() is a no-op if the event isn't active.
michael@0 82 int rv = event_del(e);
michael@0 83 delete e;
michael@0 84 return (rv == 0);
michael@0 85 }
michael@0 86
michael@0 87 // Called if a byte is received on the wakeup pipe.
michael@0 88 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
michael@0 89 base::MessagePumpLibevent* that =
michael@0 90 static_cast<base::MessagePumpLibevent*>(context);
michael@0 91 DCHECK(that->wakeup_pipe_out_ == socket);
michael@0 92
michael@0 93 // Remove and discard the wakeup byte.
michael@0 94 char buf;
michael@0 95 int nread = HANDLE_EINTR(read(socket, &buf, 1));
michael@0 96 DCHECK_EQ(nread, 1);
michael@0 97 // Tell libevent to break out of inner loop.
michael@0 98 event_base_loopbreak(that->event_base_);
michael@0 99 }
michael@0 100
michael@0 101 MessagePumpLibevent::MessagePumpLibevent()
michael@0 102 : keep_running_(true),
michael@0 103 in_run_(false),
michael@0 104 event_base_(event_base_new()),
michael@0 105 wakeup_pipe_in_(-1),
michael@0 106 wakeup_pipe_out_(-1) {
michael@0 107 if (!Init())
michael@0 108 NOTREACHED();
michael@0 109 }
michael@0 110
michael@0 111 bool MessagePumpLibevent::Init() {
michael@0 112 int fds[2];
michael@0 113 if (pipe(fds)) {
michael@0 114 DLOG(ERROR) << "pipe() failed, errno: " << errno;
michael@0 115 return false;
michael@0 116 }
michael@0 117 if (SetNonBlocking(fds[0])) {
michael@0 118 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
michael@0 119 return false;
michael@0 120 }
michael@0 121 if (SetNonBlocking(fds[1])) {
michael@0 122 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
michael@0 123 return false;
michael@0 124 }
michael@0 125 wakeup_pipe_out_ = fds[0];
michael@0 126 wakeup_pipe_in_ = fds[1];
michael@0 127
michael@0 128 wakeup_event_ = new event;
michael@0 129 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
michael@0 130 OnWakeup, this);
michael@0 131 event_base_set(event_base_, wakeup_event_);
michael@0 132
michael@0 133 if (event_add(wakeup_event_, 0))
michael@0 134 return false;
michael@0 135 return true;
michael@0 136 }
michael@0 137
michael@0 138 MessagePumpLibevent::~MessagePumpLibevent() {
michael@0 139 DCHECK(wakeup_event_);
michael@0 140 DCHECK(event_base_);
michael@0 141 event_del(wakeup_event_);
michael@0 142 delete wakeup_event_;
michael@0 143 if (wakeup_pipe_in_ >= 0)
michael@0 144 close(wakeup_pipe_in_);
michael@0 145 if (wakeup_pipe_out_ >= 0)
michael@0 146 close(wakeup_pipe_out_);
michael@0 147 event_base_free(event_base_);
michael@0 148 }
michael@0 149
michael@0 150 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
michael@0 151 bool persistent,
michael@0 152 Mode mode,
michael@0 153 FileDescriptorWatcher *controller,
michael@0 154 Watcher *delegate) {
michael@0 155 DCHECK(fd > 0);
michael@0 156 DCHECK(controller);
michael@0 157 DCHECK(delegate);
michael@0 158 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
michael@0 159
michael@0 160 int event_mask = persistent ? EV_PERSIST : 0;
michael@0 161 if ((mode & WATCH_READ) != 0) {
michael@0 162 event_mask |= EV_READ;
michael@0 163 }
michael@0 164 if ((mode & WATCH_WRITE) != 0) {
michael@0 165 event_mask |= EV_WRITE;
michael@0 166 }
michael@0 167
michael@0 168 // |should_delete_event| is true if we're modifying an event that's currently
michael@0 169 // active in |controller|.
michael@0 170 // If we're modifying an existing event and there's an error then we need to
michael@0 171 // tell libevent to clean it up via event_delete() before returning.
michael@0 172 bool should_delete_event = true;
michael@0 173 scoped_ptr<event> evt(controller->ReleaseEvent());
michael@0 174 if (evt.get() == NULL) {
michael@0 175 should_delete_event = false;
michael@0 176 // Ownership is transferred to the controller.
michael@0 177 evt.reset(new event);
michael@0 178 }
michael@0 179
michael@0 180 // Set current interest mask and message pump for this event.
michael@0 181 event_set(evt.get(), fd, event_mask, OnLibeventNotification,
michael@0 182 delegate);
michael@0 183
michael@0 184 // Tell libevent which message pump this socket will belong to when we add it.
michael@0 185 if (event_base_set(event_base_, evt.get()) != 0) {
michael@0 186 if (should_delete_event) {
michael@0 187 event_del(evt.get());
michael@0 188 }
michael@0 189 return false;
michael@0 190 }
michael@0 191
michael@0 192 // Add this socket to the list of monitored sockets.
michael@0 193 if (event_add(evt.get(), NULL) != 0) {
michael@0 194 if (should_delete_event) {
michael@0 195 event_del(evt.get());
michael@0 196 }
michael@0 197 return false;
michael@0 198 }
michael@0 199
michael@0 200 // Transfer ownership of evt to controller.
michael@0 201 controller->Init(evt.release(), persistent);
michael@0 202 return true;
michael@0 203 }
michael@0 204
michael@0 205
michael@0 206 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
michael@0 207 void* context) {
michael@0 208 Watcher* watcher = static_cast<Watcher*>(context);
michael@0 209
michael@0 210 if (flags & EV_WRITE) {
michael@0 211 watcher->OnFileCanWriteWithoutBlocking(fd);
michael@0 212 }
michael@0 213 if (flags & EV_READ) {
michael@0 214 watcher->OnFileCanReadWithoutBlocking(fd);
michael@0 215 }
michael@0 216 }
michael@0 217
michael@0 218
michael@0 219 MessagePumpLibevent::SignalEvent::SignalEvent() :
michael@0 220 event_(NULL)
michael@0 221 {
michael@0 222 }
michael@0 223
michael@0 224 MessagePumpLibevent::SignalEvent::~SignalEvent()
michael@0 225 {
michael@0 226 if (event_) {
michael@0 227 StopCatching();
michael@0 228 }
michael@0 229 }
michael@0 230
michael@0 231 void
michael@0 232 MessagePumpLibevent::SignalEvent::Init(event *e)
michael@0 233 {
michael@0 234 DCHECK(e);
michael@0 235 DCHECK(event_ == NULL);
michael@0 236 event_ = e;
michael@0 237 }
michael@0 238
michael@0 239 bool
michael@0 240 MessagePumpLibevent::SignalEvent::StopCatching()
michael@0 241 {
michael@0 242 // XXX/cjones: this code could be shared with
michael@0 243 // FileDescriptorWatcher. ironic that libevent is "more"
michael@0 244 // object-oriented than this C++
michael@0 245 event* e = ReleaseEvent();
michael@0 246 if (e == NULL)
michael@0 247 return true;
michael@0 248
michael@0 249 // event_del() is a no-op if the event isn't active.
michael@0 250 int rv = event_del(e);
michael@0 251 delete e;
michael@0 252 return (rv == 0);
michael@0 253 }
michael@0 254
michael@0 255 event *
michael@0 256 MessagePumpLibevent::SignalEvent::ReleaseEvent()
michael@0 257 {
michael@0 258 event *e = event_;
michael@0 259 event_ = NULL;
michael@0 260 return e;
michael@0 261 }
michael@0 262
michael@0 263 bool
michael@0 264 MessagePumpLibevent::CatchSignal(int sig,
michael@0 265 SignalEvent* sigevent,
michael@0 266 SignalWatcher* delegate)
michael@0 267 {
michael@0 268 DCHECK(sig > 0);
michael@0 269 DCHECK(sigevent);
michael@0 270 DCHECK(delegate);
michael@0 271 // TODO if we want to support re-using SignalEvents, this code needs
michael@0 272 // to jump through the same hoops as WatchFileDescriptor(). Not
michael@0 273 // needed at present
michael@0 274 DCHECK(NULL == sigevent->event_);
michael@0 275
michael@0 276 scoped_ptr<event> evt(new event);
michael@0 277 signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate);
michael@0 278
michael@0 279 if (event_base_set(event_base_, evt.get()))
michael@0 280 return false;
michael@0 281
michael@0 282 if (signal_add(evt.get(), NULL))
michael@0 283 return false;
michael@0 284
michael@0 285 // Transfer ownership of evt to controller.
michael@0 286 sigevent->Init(evt.release());
michael@0 287 return true;
michael@0 288 }
michael@0 289
michael@0 290 void
michael@0 291 MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags,
michael@0 292 void* context)
michael@0 293 {
michael@0 294 DCHECK(sig > 0);
michael@0 295 DCHECK(EV_SIGNAL == flags);
michael@0 296 DCHECK(context);
michael@0 297 reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig);
michael@0 298 }
michael@0 299
michael@0 300
michael@0 301 // Reentrant!
michael@0 302 void MessagePumpLibevent::Run(Delegate* delegate) {
michael@0 303 DCHECK(keep_running_) << "Quit must have been called outside of Run!";
michael@0 304
michael@0 305 bool old_in_run = in_run_;
michael@0 306 in_run_ = true;
michael@0 307
michael@0 308 for (;;) {
michael@0 309 ScopedNSAutoreleasePool autorelease_pool;
michael@0 310
michael@0 311 bool did_work = delegate->DoWork();
michael@0 312 if (!keep_running_)
michael@0 313 break;
michael@0 314
michael@0 315 did_work |= delegate->DoDelayedWork(&delayed_work_time_);
michael@0 316 if (!keep_running_)
michael@0 317 break;
michael@0 318
michael@0 319 if (did_work)
michael@0 320 continue;
michael@0 321
michael@0 322 did_work = delegate->DoIdleWork();
michael@0 323 if (!keep_running_)
michael@0 324 break;
michael@0 325
michael@0 326 if (did_work)
michael@0 327 continue;
michael@0 328
michael@0 329 // EVLOOP_ONCE tells libevent to only block once,
michael@0 330 // but to service all pending events when it wakes up.
michael@0 331 if (delayed_work_time_.is_null()) {
michael@0 332 event_base_loop(event_base_, EVLOOP_ONCE);
michael@0 333 } else {
michael@0 334 TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
michael@0 335 if (delay > TimeDelta()) {
michael@0 336 struct timeval poll_tv;
michael@0 337 poll_tv.tv_sec = delay.InSeconds();
michael@0 338 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
michael@0 339 event_base_loopexit(event_base_, &poll_tv);
michael@0 340 event_base_loop(event_base_, EVLOOP_ONCE);
michael@0 341 } else {
michael@0 342 // It looks like delayed_work_time_ indicates a time in the past, so we
michael@0 343 // need to call DoDelayedWork now.
michael@0 344 delayed_work_time_ = TimeTicks();
michael@0 345 }
michael@0 346 }
michael@0 347 }
michael@0 348
michael@0 349 keep_running_ = true;
michael@0 350 in_run_ = old_in_run;
michael@0 351 }
michael@0 352
michael@0 353 void MessagePumpLibevent::Quit() {
michael@0 354 DCHECK(in_run_);
michael@0 355 // Tell both libevent and Run that they should break out of their loops.
michael@0 356 keep_running_ = false;
michael@0 357 ScheduleWork();
michael@0 358 }
michael@0 359
michael@0 360 void MessagePumpLibevent::ScheduleWork() {
michael@0 361 // Tell libevent (in a threadsafe way) that it should break out of its loop.
michael@0 362 char buf = 0;
michael@0 363 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
michael@0 364 DCHECK(nwrite == 1 || errno == EAGAIN)
michael@0 365 << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
michael@0 366 }
michael@0 367
michael@0 368 void MessagePumpLibevent::ScheduleDelayedWork(
michael@0 369 const TimeTicks& delayed_work_time) {
michael@0 370 // We know that we can't be blocked on Wait right now since this method can
michael@0 371 // only be called on the same thread as Run, so we only need to update our
michael@0 372 // record of how long to sleep when we do sleep.
michael@0 373 delayed_work_time_ = delayed_work_time;
michael@0 374 }
michael@0 375
michael@0 376 void LineWatcher::OnFileCanReadWithoutBlocking(int aFd)
michael@0 377 {
michael@0 378 ssize_t length = 0;
michael@0 379
michael@0 380 while (true) {
michael@0 381 length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex);
michael@0 382 DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex));
michael@0 383 if (length <= 0) {
michael@0 384 if (length < 0) {
michael@0 385 if (errno == EINTR) {
michael@0 386 continue; // retry system call when interrupted
michael@0 387 }
michael@0 388 if (errno == EAGAIN || errno == EWOULDBLOCK) {
michael@0 389 return; // no data available: return and re-poll
michael@0 390 }
michael@0 391 DLOG(ERROR) << "Can't read from fd, error " << errno;
michael@0 392 } else {
michael@0 393 DLOG(ERROR) << "End of file";
michael@0 394 }
michael@0 395 // At this point, assume that we can't actually access
michael@0 396 // the socket anymore, and indicate an error.
michael@0 397 OnError();
michael@0 398 mReceivedIndex = 0;
michael@0 399 return;
michael@0 400 }
michael@0 401
michael@0 402 while (length-- > 0) {
michael@0 403 DCHECK(mReceivedIndex < mBufferSize);
michael@0 404 if (mReceiveBuffer[mReceivedIndex] == mTerminator) {
michael@0 405 nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex);
michael@0 406 OnLineRead(aFd, message);
michael@0 407 if (length > 0) {
michael@0 408 DCHECK(mReceivedIndex < (mBufferSize - 1));
michael@0 409 memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
michael@0 410 }
michael@0 411 mReceivedIndex = 0;
michael@0 412 } else {
michael@0 413 mReceivedIndex++;
michael@0 414 }
michael@0 415 }
michael@0 416 }
michael@0 417 }
michael@0 418 } // namespace base

mercurial