ipc/chromium/src/base/message_pump_libevent.cc

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 // Copyright (c) 2008 The Chromium Authors. All rights reserved.
     2 // Use of this source code is governed by a BSD-style license that can be
     3 // found in the LICENSE file.
     5 #include "base/message_pump_libevent.h"
     7 #include <errno.h>
     8 #include <fcntl.h>
     9 #if defined(ANDROID) || defined(OS_POSIX)
    10 #include <unistd.h>
    11 #endif
    13 #include "eintr_wrapper.h"
    14 #include "base/logging.h"
    15 #include "base/scoped_nsautorelease_pool.h"
    16 #include "base/scoped_ptr.h"
    17 #include "base/time.h"
    18 #include "nsDependentSubstring.h"
    19 #include "third_party/libevent/event.h"
    21 // Lifecycle of struct event
    22 // Libevent uses two main data structures:
    23 // struct event_base (of which there is one per message pump), and
    24 // struct event (of which there is roughly one per socket).
    25 // The socket's struct event is created in
    26 // MessagePumpLibevent::WatchFileDescriptor(),
    27 // is owned by the FileDescriptorWatcher, and is destroyed in
    28 // StopWatchingFileDescriptor().
    29 // It is moved into and out of lists in struct event_base by
    30 // the libevent functions event_add() and event_del().
    31 //
    32 // TODO(dkegel):
    33 // At the moment bad things happen if a FileDescriptorWatcher
    34 // is active after its MessagePumpLibevent has been destroyed.
    35 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
    36 // Not clear yet whether that situation occurs in practice,
    37 // but if it does, we need to fix it.
    39 namespace base {
    41 // Return 0 on success
    42 // Too small a function to bother putting in a library?
    43 static int SetNonBlocking(int fd) {
    44   int flags = fcntl(fd, F_GETFL, 0);
    45   if (flags == -1)
    46     flags = 0;
    47   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    48 }
    50 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
    51     : is_persistent_(false),
    52       event_(NULL) {
    53 }
    55 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
    56   if (event_) {
    57     StopWatchingFileDescriptor();
    58   }
    59 }
    61 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
    62                                                       bool is_persistent) {
    63   DCHECK(e);
    64   DCHECK(event_ == NULL);
    66   is_persistent_ = is_persistent;
    67   event_ = e;
    68 }
    70 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
    71   struct event *e = event_;
    72   event_ = NULL;
    73   return e;
    74 }
    76 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
    77   event* e = ReleaseEvent();
    78   if (e == NULL)
    79     return true;
    81   // event_del() is a no-op if the event isn't active.
    82   int rv = event_del(e);
    83   delete e;
    84   return (rv == 0);
    85 }
    87 // Called if a byte is received on the wakeup pipe.
    88 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
    89   base::MessagePumpLibevent* that =
    90               static_cast<base::MessagePumpLibevent*>(context);
    91   DCHECK(that->wakeup_pipe_out_ == socket);
    93   // Remove and discard the wakeup byte.
    94   char buf;
    95   int nread = HANDLE_EINTR(read(socket, &buf, 1));
    96   DCHECK_EQ(nread, 1);
    97   // Tell libevent to break out of inner loop.
    98   event_base_loopbreak(that->event_base_);
    99 }
   101 MessagePumpLibevent::MessagePumpLibevent()
   102     : keep_running_(true),
   103       in_run_(false),
   104       event_base_(event_base_new()),
   105       wakeup_pipe_in_(-1),
   106       wakeup_pipe_out_(-1) {
   107   if (!Init())
   108      NOTREACHED();
   109 }
   111 bool MessagePumpLibevent::Init() {
   112   int fds[2];
   113   if (pipe(fds)) {
   114     DLOG(ERROR) << "pipe() failed, errno: " << errno;
   115     return false;
   116   }
   117   if (SetNonBlocking(fds[0])) {
   118     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
   119     return false;
   120   }
   121   if (SetNonBlocking(fds[1])) {
   122     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
   123     return false;
   124   }
   125   wakeup_pipe_out_ = fds[0];
   126   wakeup_pipe_in_ = fds[1];
   128   wakeup_event_ = new event;
   129   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
   130             OnWakeup, this);
   131   event_base_set(event_base_, wakeup_event_);
   133   if (event_add(wakeup_event_, 0))
   134     return false;
   135   return true;
   136 }
   138 MessagePumpLibevent::~MessagePumpLibevent() {
   139   DCHECK(wakeup_event_);
   140   DCHECK(event_base_);
   141   event_del(wakeup_event_);
   142   delete wakeup_event_;
   143   if (wakeup_pipe_in_ >= 0)
   144     close(wakeup_pipe_in_);
   145   if (wakeup_pipe_out_ >= 0)
   146     close(wakeup_pipe_out_);
   147   event_base_free(event_base_);
   148 }
   150 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
   151                                               bool persistent,
   152                                               Mode mode,
   153                                               FileDescriptorWatcher *controller,
   154                                               Watcher *delegate) {
   155   DCHECK(fd > 0);
   156   DCHECK(controller);
   157   DCHECK(delegate);
   158   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
   160   int event_mask = persistent ? EV_PERSIST : 0;
   161   if ((mode & WATCH_READ) != 0) {
   162     event_mask |= EV_READ;
   163   }
   164   if ((mode & WATCH_WRITE) != 0) {
   165     event_mask |= EV_WRITE;
   166   }
   168   // |should_delete_event| is true if we're modifying an event that's currently
   169   // active in |controller|.
   170   // If we're modifying an existing event and there's an error then we need to
   171   // tell libevent to clean it up via event_delete() before returning.
   172   bool should_delete_event = true;
   173   scoped_ptr<event> evt(controller->ReleaseEvent());
   174   if (evt.get() == NULL) {
   175     should_delete_event = false;
   176     // Ownership is transferred to the controller.
   177     evt.reset(new event);
   178   }
   180   // Set current interest mask and message pump for this event.
   181   event_set(evt.get(), fd, event_mask, OnLibeventNotification,
   182             delegate);
   184   // Tell libevent which message pump this socket will belong to when we add it.
   185   if (event_base_set(event_base_, evt.get()) != 0) {
   186     if (should_delete_event) {
   187       event_del(evt.get());
   188     }
   189     return false;
   190   }
   192   // Add this socket to the list of monitored sockets.
   193   if (event_add(evt.get(), NULL) != 0) {
   194     if (should_delete_event) {
   195       event_del(evt.get());
   196     }
   197     return false;
   198   }
   200   // Transfer ownership of evt to controller.
   201   controller->Init(evt.release(), persistent);
   202   return true;
   203 }
   206 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
   207                                                  void* context) {
   208   Watcher* watcher = static_cast<Watcher*>(context);
   210   if (flags & EV_WRITE) {
   211     watcher->OnFileCanWriteWithoutBlocking(fd);
   212   }
   213   if (flags & EV_READ) {
   214     watcher->OnFileCanReadWithoutBlocking(fd);
   215   }
   216 }
   219 MessagePumpLibevent::SignalEvent::SignalEvent() :
   220   event_(NULL)
   221 {
   222 }
   224 MessagePumpLibevent::SignalEvent::~SignalEvent()
   225 {
   226   if (event_) {
   227     StopCatching();
   228   }
   229 }
   231 void
   232 MessagePumpLibevent::SignalEvent::Init(event *e)
   233 {
   234   DCHECK(e);
   235   DCHECK(event_ == NULL);
   236   event_ = e;
   237 }
   239 bool
   240 MessagePumpLibevent::SignalEvent::StopCatching()
   241 {
   242   // XXX/cjones: this code could be shared with
   243   // FileDescriptorWatcher. ironic that libevent is "more"
   244   // object-oriented than this C++
   245   event* e = ReleaseEvent();
   246   if (e == NULL)
   247     return true;
   249   // event_del() is a no-op if the event isn't active.
   250   int rv = event_del(e);
   251   delete e;
   252   return (rv == 0);
   253 }
   255 event *
   256 MessagePumpLibevent::SignalEvent::ReleaseEvent()
   257 {
   258   event *e = event_;
   259   event_ = NULL;
   260   return e;
   261 }
   263 bool
   264 MessagePumpLibevent::CatchSignal(int sig,
   265                                  SignalEvent* sigevent,
   266                                  SignalWatcher* delegate)
   267 {
   268   DCHECK(sig > 0);
   269   DCHECK(sigevent);
   270   DCHECK(delegate);
   271   // TODO if we want to support re-using SignalEvents, this code needs
   272   // to jump through the same hoops as WatchFileDescriptor().  Not
   273   // needed at present
   274   DCHECK(NULL == sigevent->event_);
   276   scoped_ptr<event> evt(new event);
   277   signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate);
   279   if (event_base_set(event_base_, evt.get()))
   280     return false;
   282   if (signal_add(evt.get(), NULL))
   283     return false;
   285   // Transfer ownership of evt to controller.
   286   sigevent->Init(evt.release());
   287   return true;
   288 }
   290 void
   291 MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags,
   292                                                   void* context)
   293 {
   294   DCHECK(sig > 0);
   295   DCHECK(EV_SIGNAL == flags);
   296   DCHECK(context);
   297   reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig);
   298 }
   301 // Reentrant!
   302 void MessagePumpLibevent::Run(Delegate* delegate) {
   303   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
   305   bool old_in_run = in_run_;
   306   in_run_ = true;
   308   for (;;) {
   309     ScopedNSAutoreleasePool autorelease_pool;
   311     bool did_work = delegate->DoWork();
   312     if (!keep_running_)
   313       break;
   315     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
   316     if (!keep_running_)
   317       break;
   319     if (did_work)
   320       continue;
   322     did_work = delegate->DoIdleWork();
   323     if (!keep_running_)
   324       break;
   326     if (did_work)
   327       continue;
   329     // EVLOOP_ONCE tells libevent to only block once,
   330     // but to service all pending events when it wakes up.
   331     if (delayed_work_time_.is_null()) {
   332       event_base_loop(event_base_, EVLOOP_ONCE);
   333     } else {
   334       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
   335       if (delay > TimeDelta()) {
   336         struct timeval poll_tv;
   337         poll_tv.tv_sec = delay.InSeconds();
   338         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
   339         event_base_loopexit(event_base_, &poll_tv);
   340         event_base_loop(event_base_, EVLOOP_ONCE);
   341       } else {
   342         // It looks like delayed_work_time_ indicates a time in the past, so we
   343         // need to call DoDelayedWork now.
   344         delayed_work_time_ = TimeTicks();
   345       }
   346     }
   347   }
   349   keep_running_ = true;
   350   in_run_ = old_in_run;
   351 }
   353 void MessagePumpLibevent::Quit() {
   354   DCHECK(in_run_);
   355   // Tell both libevent and Run that they should break out of their loops.
   356   keep_running_ = false;
   357   ScheduleWork();
   358 }
   360 void MessagePumpLibevent::ScheduleWork() {
   361   // Tell libevent (in a threadsafe way) that it should break out of its loop.
   362   char buf = 0;
   363   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
   364   DCHECK(nwrite == 1 || errno == EAGAIN)
   365       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
   366 }
   368 void MessagePumpLibevent::ScheduleDelayedWork(
   369     const TimeTicks& delayed_work_time) {
   370   // We know that we can't be blocked on Wait right now since this method can
   371   // only be called on the same thread as Run, so we only need to update our
   372   // record of how long to sleep when we do sleep.
   373   delayed_work_time_ = delayed_work_time;
   374 }
   376 void LineWatcher::OnFileCanReadWithoutBlocking(int aFd)
   377 {
   378   ssize_t length = 0;
   380   while (true) {
   381     length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex);
   382     DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex));
   383     if (length <= 0) {
   384       if (length < 0) {
   385         if (errno == EINTR) {
   386           continue; // retry system call when interrupted
   387         }
   388         if (errno == EAGAIN || errno == EWOULDBLOCK) {
   389           return; // no data available: return and re-poll
   390         }
   391         DLOG(ERROR) << "Can't read from fd, error " << errno;
   392       } else {
   393         DLOG(ERROR) << "End of file";
   394       }
   395       // At this point, assume that we can't actually access
   396       // the socket anymore, and indicate an error.
   397       OnError();
   398       mReceivedIndex = 0;
   399       return;
   400     }
   402     while (length-- > 0) {
   403       DCHECK(mReceivedIndex < mBufferSize);
   404       if (mReceiveBuffer[mReceivedIndex] == mTerminator) {
   405         nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex);
   406         OnLineRead(aFd, message);
   407         if (length > 0) {
   408           DCHECK(mReceivedIndex < (mBufferSize - 1));
   409           memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
   410         }
   411         mReceivedIndex = 0;
   412       } else {
   413         mReceivedIndex++;
   414       }
   415     }
   416   }
   417 }
   418 }  // namespace base

mercurial