michael@0: // Copyright (c) 2008 The Chromium Authors. All rights reserved. michael@0: // Use of this source code is governed by a BSD-style license that can be michael@0: // found in the LICENSE file. michael@0: michael@0: #include "base/message_pump_libevent.h" michael@0: michael@0: #include michael@0: #include michael@0: #if defined(ANDROID) || defined(OS_POSIX) michael@0: #include michael@0: #endif michael@0: michael@0: #include "eintr_wrapper.h" michael@0: #include "base/logging.h" michael@0: #include "base/scoped_nsautorelease_pool.h" michael@0: #include "base/scoped_ptr.h" michael@0: #include "base/time.h" michael@0: #include "nsDependentSubstring.h" michael@0: #include "third_party/libevent/event.h" michael@0: michael@0: // Lifecycle of struct event michael@0: // Libevent uses two main data structures: michael@0: // struct event_base (of which there is one per message pump), and michael@0: // struct event (of which there is roughly one per socket). michael@0: // The socket's struct event is created in michael@0: // MessagePumpLibevent::WatchFileDescriptor(), michael@0: // is owned by the FileDescriptorWatcher, and is destroyed in michael@0: // StopWatchingFileDescriptor(). michael@0: // It is moved into and out of lists in struct event_base by michael@0: // the libevent functions event_add() and event_del(). michael@0: // michael@0: // TODO(dkegel): michael@0: // At the moment bad things happen if a FileDescriptorWatcher michael@0: // is active after its MessagePumpLibevent has been destroyed. michael@0: // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop michael@0: // Not clear yet whether that situation occurs in practice, michael@0: // but if it does, we need to fix it. michael@0: michael@0: namespace base { michael@0: michael@0: // Return 0 on success michael@0: // Too small a function to bother putting in a library? michael@0: static int SetNonBlocking(int fd) { michael@0: int flags = fcntl(fd, F_GETFL, 0); michael@0: if (flags == -1) michael@0: flags = 0; michael@0: return fcntl(fd, F_SETFL, flags | O_NONBLOCK); michael@0: } michael@0: michael@0: MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() michael@0: : is_persistent_(false), michael@0: event_(NULL) { michael@0: } michael@0: michael@0: MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { michael@0: if (event_) { michael@0: StopWatchingFileDescriptor(); michael@0: } michael@0: } michael@0: michael@0: void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, michael@0: bool is_persistent) { michael@0: DCHECK(e); michael@0: DCHECK(event_ == NULL); michael@0: michael@0: is_persistent_ = is_persistent; michael@0: event_ = e; michael@0: } michael@0: michael@0: event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { michael@0: struct event *e = event_; michael@0: event_ = NULL; michael@0: return e; michael@0: } michael@0: michael@0: bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { michael@0: event* e = ReleaseEvent(); michael@0: if (e == NULL) michael@0: return true; michael@0: michael@0: // event_del() is a no-op if the event isn't active. michael@0: int rv = event_del(e); michael@0: delete e; michael@0: return (rv == 0); michael@0: } michael@0: michael@0: // Called if a byte is received on the wakeup pipe. michael@0: void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { michael@0: base::MessagePumpLibevent* that = michael@0: static_cast(context); michael@0: DCHECK(that->wakeup_pipe_out_ == socket); michael@0: michael@0: // Remove and discard the wakeup byte. michael@0: char buf; michael@0: int nread = HANDLE_EINTR(read(socket, &buf, 1)); michael@0: DCHECK_EQ(nread, 1); michael@0: // Tell libevent to break out of inner loop. michael@0: event_base_loopbreak(that->event_base_); michael@0: } michael@0: michael@0: MessagePumpLibevent::MessagePumpLibevent() michael@0: : keep_running_(true), michael@0: in_run_(false), michael@0: event_base_(event_base_new()), michael@0: wakeup_pipe_in_(-1), michael@0: wakeup_pipe_out_(-1) { michael@0: if (!Init()) michael@0: NOTREACHED(); michael@0: } michael@0: michael@0: bool MessagePumpLibevent::Init() { michael@0: int fds[2]; michael@0: if (pipe(fds)) { michael@0: DLOG(ERROR) << "pipe() failed, errno: " << errno; michael@0: return false; michael@0: } michael@0: if (SetNonBlocking(fds[0])) { michael@0: DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; michael@0: return false; michael@0: } michael@0: if (SetNonBlocking(fds[1])) { michael@0: DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; michael@0: return false; michael@0: } michael@0: wakeup_pipe_out_ = fds[0]; michael@0: wakeup_pipe_in_ = fds[1]; michael@0: michael@0: wakeup_event_ = new event; michael@0: event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, michael@0: OnWakeup, this); michael@0: event_base_set(event_base_, wakeup_event_); michael@0: michael@0: if (event_add(wakeup_event_, 0)) michael@0: return false; michael@0: return true; michael@0: } michael@0: michael@0: MessagePumpLibevent::~MessagePumpLibevent() { michael@0: DCHECK(wakeup_event_); michael@0: DCHECK(event_base_); michael@0: event_del(wakeup_event_); michael@0: delete wakeup_event_; michael@0: if (wakeup_pipe_in_ >= 0) michael@0: close(wakeup_pipe_in_); michael@0: if (wakeup_pipe_out_ >= 0) michael@0: close(wakeup_pipe_out_); michael@0: event_base_free(event_base_); michael@0: } michael@0: michael@0: bool MessagePumpLibevent::WatchFileDescriptor(int fd, michael@0: bool persistent, michael@0: Mode mode, michael@0: FileDescriptorWatcher *controller, michael@0: Watcher *delegate) { michael@0: DCHECK(fd > 0); michael@0: DCHECK(controller); michael@0: DCHECK(delegate); michael@0: DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); michael@0: michael@0: int event_mask = persistent ? EV_PERSIST : 0; michael@0: if ((mode & WATCH_READ) != 0) { michael@0: event_mask |= EV_READ; michael@0: } michael@0: if ((mode & WATCH_WRITE) != 0) { michael@0: event_mask |= EV_WRITE; michael@0: } michael@0: michael@0: // |should_delete_event| is true if we're modifying an event that's currently michael@0: // active in |controller|. michael@0: // If we're modifying an existing event and there's an error then we need to michael@0: // tell libevent to clean it up via event_delete() before returning. michael@0: bool should_delete_event = true; michael@0: scoped_ptr evt(controller->ReleaseEvent()); michael@0: if (evt.get() == NULL) { michael@0: should_delete_event = false; michael@0: // Ownership is transferred to the controller. michael@0: evt.reset(new event); michael@0: } michael@0: michael@0: // Set current interest mask and message pump for this event. michael@0: event_set(evt.get(), fd, event_mask, OnLibeventNotification, michael@0: delegate); michael@0: michael@0: // Tell libevent which message pump this socket will belong to when we add it. michael@0: if (event_base_set(event_base_, evt.get()) != 0) { michael@0: if (should_delete_event) { michael@0: event_del(evt.get()); michael@0: } michael@0: return false; michael@0: } michael@0: michael@0: // Add this socket to the list of monitored sockets. michael@0: if (event_add(evt.get(), NULL) != 0) { michael@0: if (should_delete_event) { michael@0: event_del(evt.get()); michael@0: } michael@0: return false; michael@0: } michael@0: michael@0: // Transfer ownership of evt to controller. michael@0: controller->Init(evt.release(), persistent); michael@0: return true; michael@0: } michael@0: michael@0: michael@0: void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, michael@0: void* context) { michael@0: Watcher* watcher = static_cast(context); michael@0: michael@0: if (flags & EV_WRITE) { michael@0: watcher->OnFileCanWriteWithoutBlocking(fd); michael@0: } michael@0: if (flags & EV_READ) { michael@0: watcher->OnFileCanReadWithoutBlocking(fd); michael@0: } michael@0: } michael@0: michael@0: michael@0: MessagePumpLibevent::SignalEvent::SignalEvent() : michael@0: event_(NULL) michael@0: { michael@0: } michael@0: michael@0: MessagePumpLibevent::SignalEvent::~SignalEvent() michael@0: { michael@0: if (event_) { michael@0: StopCatching(); michael@0: } michael@0: } michael@0: michael@0: void michael@0: MessagePumpLibevent::SignalEvent::Init(event *e) michael@0: { michael@0: DCHECK(e); michael@0: DCHECK(event_ == NULL); michael@0: event_ = e; michael@0: } michael@0: michael@0: bool michael@0: MessagePumpLibevent::SignalEvent::StopCatching() michael@0: { michael@0: // XXX/cjones: this code could be shared with michael@0: // FileDescriptorWatcher. ironic that libevent is "more" michael@0: // object-oriented than this C++ michael@0: event* e = ReleaseEvent(); michael@0: if (e == NULL) michael@0: return true; michael@0: michael@0: // event_del() is a no-op if the event isn't active. michael@0: int rv = event_del(e); michael@0: delete e; michael@0: return (rv == 0); michael@0: } michael@0: michael@0: event * michael@0: MessagePumpLibevent::SignalEvent::ReleaseEvent() michael@0: { michael@0: event *e = event_; michael@0: event_ = NULL; michael@0: return e; michael@0: } michael@0: michael@0: bool michael@0: MessagePumpLibevent::CatchSignal(int sig, michael@0: SignalEvent* sigevent, michael@0: SignalWatcher* delegate) michael@0: { michael@0: DCHECK(sig > 0); michael@0: DCHECK(sigevent); michael@0: DCHECK(delegate); michael@0: // TODO if we want to support re-using SignalEvents, this code needs michael@0: // to jump through the same hoops as WatchFileDescriptor(). Not michael@0: // needed at present michael@0: DCHECK(NULL == sigevent->event_); michael@0: michael@0: scoped_ptr evt(new event); michael@0: signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate); michael@0: michael@0: if (event_base_set(event_base_, evt.get())) michael@0: return false; michael@0: michael@0: if (signal_add(evt.get(), NULL)) michael@0: return false; michael@0: michael@0: // Transfer ownership of evt to controller. michael@0: sigevent->Init(evt.release()); michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags, michael@0: void* context) michael@0: { michael@0: DCHECK(sig > 0); michael@0: DCHECK(EV_SIGNAL == flags); michael@0: DCHECK(context); michael@0: reinterpret_cast(context)->OnSignal(sig); michael@0: } michael@0: michael@0: michael@0: // Reentrant! michael@0: void MessagePumpLibevent::Run(Delegate* delegate) { michael@0: DCHECK(keep_running_) << "Quit must have been called outside of Run!"; michael@0: michael@0: bool old_in_run = in_run_; michael@0: in_run_ = true; michael@0: michael@0: for (;;) { michael@0: ScopedNSAutoreleasePool autorelease_pool; michael@0: michael@0: bool did_work = delegate->DoWork(); michael@0: if (!keep_running_) michael@0: break; michael@0: michael@0: did_work |= delegate->DoDelayedWork(&delayed_work_time_); michael@0: if (!keep_running_) michael@0: break; michael@0: michael@0: if (did_work) michael@0: continue; michael@0: michael@0: did_work = delegate->DoIdleWork(); michael@0: if (!keep_running_) michael@0: break; michael@0: michael@0: if (did_work) michael@0: continue; michael@0: michael@0: // EVLOOP_ONCE tells libevent to only block once, michael@0: // but to service all pending events when it wakes up. michael@0: if (delayed_work_time_.is_null()) { michael@0: event_base_loop(event_base_, EVLOOP_ONCE); michael@0: } else { michael@0: TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); michael@0: if (delay > TimeDelta()) { michael@0: struct timeval poll_tv; michael@0: poll_tv.tv_sec = delay.InSeconds(); michael@0: poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; michael@0: event_base_loopexit(event_base_, &poll_tv); michael@0: event_base_loop(event_base_, EVLOOP_ONCE); michael@0: } else { michael@0: // It looks like delayed_work_time_ indicates a time in the past, so we michael@0: // need to call DoDelayedWork now. michael@0: delayed_work_time_ = TimeTicks(); michael@0: } michael@0: } michael@0: } michael@0: michael@0: keep_running_ = true; michael@0: in_run_ = old_in_run; michael@0: } michael@0: michael@0: void MessagePumpLibevent::Quit() { michael@0: DCHECK(in_run_); michael@0: // Tell both libevent and Run that they should break out of their loops. michael@0: keep_running_ = false; michael@0: ScheduleWork(); michael@0: } michael@0: michael@0: void MessagePumpLibevent::ScheduleWork() { michael@0: // Tell libevent (in a threadsafe way) that it should break out of its loop. michael@0: char buf = 0; michael@0: int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); michael@0: DCHECK(nwrite == 1 || errno == EAGAIN) michael@0: << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; michael@0: } michael@0: michael@0: void MessagePumpLibevent::ScheduleDelayedWork( michael@0: const TimeTicks& delayed_work_time) { michael@0: // We know that we can't be blocked on Wait right now since this method can michael@0: // only be called on the same thread as Run, so we only need to update our michael@0: // record of how long to sleep when we do sleep. michael@0: delayed_work_time_ = delayed_work_time; michael@0: } michael@0: michael@0: void LineWatcher::OnFileCanReadWithoutBlocking(int aFd) michael@0: { michael@0: ssize_t length = 0; michael@0: michael@0: while (true) { michael@0: length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex); michael@0: DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex)); michael@0: if (length <= 0) { michael@0: if (length < 0) { michael@0: if (errno == EINTR) { michael@0: continue; // retry system call when interrupted michael@0: } michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: return; // no data available: return and re-poll michael@0: } michael@0: DLOG(ERROR) << "Can't read from fd, error " << errno; michael@0: } else { michael@0: DLOG(ERROR) << "End of file"; michael@0: } michael@0: // At this point, assume that we can't actually access michael@0: // the socket anymore, and indicate an error. michael@0: OnError(); michael@0: mReceivedIndex = 0; michael@0: return; michael@0: } michael@0: michael@0: while (length-- > 0) { michael@0: DCHECK(mReceivedIndex < mBufferSize); michael@0: if (mReceiveBuffer[mReceivedIndex] == mTerminator) { michael@0: nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex); michael@0: OnLineRead(aFd, message); michael@0: if (length > 0) { michael@0: DCHECK(mReceivedIndex < (mBufferSize - 1)); michael@0: memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length); michael@0: } michael@0: mReceivedIndex = 0; michael@0: } else { michael@0: mReceivedIndex++; michael@0: } michael@0: } michael@0: } michael@0: } michael@0: } // namespace base