1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/ipc/chromium/src/base/message_pump_libevent.cc Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,418 @@ 1.4 +// Copyright (c) 2008 The Chromium Authors. All rights reserved. 1.5 +// Use of this source code is governed by a BSD-style license that can be 1.6 +// found in the LICENSE file. 1.7 + 1.8 +#include "base/message_pump_libevent.h" 1.9 + 1.10 +#include <errno.h> 1.11 +#include <fcntl.h> 1.12 +#if defined(ANDROID) || defined(OS_POSIX) 1.13 +#include <unistd.h> 1.14 +#endif 1.15 + 1.16 +#include "eintr_wrapper.h" 1.17 +#include "base/logging.h" 1.18 +#include "base/scoped_nsautorelease_pool.h" 1.19 +#include "base/scoped_ptr.h" 1.20 +#include "base/time.h" 1.21 +#include "nsDependentSubstring.h" 1.22 +#include "third_party/libevent/event.h" 1.23 + 1.24 +// Lifecycle of struct event 1.25 +// Libevent uses two main data structures: 1.26 +// struct event_base (of which there is one per message pump), and 1.27 +// struct event (of which there is roughly one per socket). 1.28 +// The socket's struct event is created in 1.29 +// MessagePumpLibevent::WatchFileDescriptor(), 1.30 +// is owned by the FileDescriptorWatcher, and is destroyed in 1.31 +// StopWatchingFileDescriptor(). 1.32 +// It is moved into and out of lists in struct event_base by 1.33 +// the libevent functions event_add() and event_del(). 1.34 +// 1.35 +// TODO(dkegel): 1.36 +// At the moment bad things happen if a FileDescriptorWatcher 1.37 +// is active after its MessagePumpLibevent has been destroyed. 1.38 +// See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop 1.39 +// Not clear yet whether that situation occurs in practice, 1.40 +// but if it does, we need to fix it. 1.41 + 1.42 +namespace base { 1.43 + 1.44 +// Return 0 on success 1.45 +// Too small a function to bother putting in a library? 1.46 +static int SetNonBlocking(int fd) { 1.47 + int flags = fcntl(fd, F_GETFL, 0); 1.48 + if (flags == -1) 1.49 + flags = 0; 1.50 + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); 1.51 +} 1.52 + 1.53 +MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() 1.54 + : is_persistent_(false), 1.55 + event_(NULL) { 1.56 +} 1.57 + 1.58 +MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { 1.59 + if (event_) { 1.60 + StopWatchingFileDescriptor(); 1.61 + } 1.62 +} 1.63 + 1.64 +void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, 1.65 + bool is_persistent) { 1.66 + DCHECK(e); 1.67 + DCHECK(event_ == NULL); 1.68 + 1.69 + is_persistent_ = is_persistent; 1.70 + event_ = e; 1.71 +} 1.72 + 1.73 +event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { 1.74 + struct event *e = event_; 1.75 + event_ = NULL; 1.76 + return e; 1.77 +} 1.78 + 1.79 +bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { 1.80 + event* e = ReleaseEvent(); 1.81 + if (e == NULL) 1.82 + return true; 1.83 + 1.84 + // event_del() is a no-op if the event isn't active. 1.85 + int rv = event_del(e); 1.86 + delete e; 1.87 + return (rv == 0); 1.88 +} 1.89 + 1.90 +// Called if a byte is received on the wakeup pipe. 1.91 +void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { 1.92 + base::MessagePumpLibevent* that = 1.93 + static_cast<base::MessagePumpLibevent*>(context); 1.94 + DCHECK(that->wakeup_pipe_out_ == socket); 1.95 + 1.96 + // Remove and discard the wakeup byte. 1.97 + char buf; 1.98 + int nread = HANDLE_EINTR(read(socket, &buf, 1)); 1.99 + DCHECK_EQ(nread, 1); 1.100 + // Tell libevent to break out of inner loop. 1.101 + event_base_loopbreak(that->event_base_); 1.102 +} 1.103 + 1.104 +MessagePumpLibevent::MessagePumpLibevent() 1.105 + : keep_running_(true), 1.106 + in_run_(false), 1.107 + event_base_(event_base_new()), 1.108 + wakeup_pipe_in_(-1), 1.109 + wakeup_pipe_out_(-1) { 1.110 + if (!Init()) 1.111 + NOTREACHED(); 1.112 +} 1.113 + 1.114 +bool MessagePumpLibevent::Init() { 1.115 + int fds[2]; 1.116 + if (pipe(fds)) { 1.117 + DLOG(ERROR) << "pipe() failed, errno: " << errno; 1.118 + return false; 1.119 + } 1.120 + if (SetNonBlocking(fds[0])) { 1.121 + DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; 1.122 + return false; 1.123 + } 1.124 + if (SetNonBlocking(fds[1])) { 1.125 + DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; 1.126 + return false; 1.127 + } 1.128 + wakeup_pipe_out_ = fds[0]; 1.129 + wakeup_pipe_in_ = fds[1]; 1.130 + 1.131 + wakeup_event_ = new event; 1.132 + event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, 1.133 + OnWakeup, this); 1.134 + event_base_set(event_base_, wakeup_event_); 1.135 + 1.136 + if (event_add(wakeup_event_, 0)) 1.137 + return false; 1.138 + return true; 1.139 +} 1.140 + 1.141 +MessagePumpLibevent::~MessagePumpLibevent() { 1.142 + DCHECK(wakeup_event_); 1.143 + DCHECK(event_base_); 1.144 + event_del(wakeup_event_); 1.145 + delete wakeup_event_; 1.146 + if (wakeup_pipe_in_ >= 0) 1.147 + close(wakeup_pipe_in_); 1.148 + if (wakeup_pipe_out_ >= 0) 1.149 + close(wakeup_pipe_out_); 1.150 + event_base_free(event_base_); 1.151 +} 1.152 + 1.153 +bool MessagePumpLibevent::WatchFileDescriptor(int fd, 1.154 + bool persistent, 1.155 + Mode mode, 1.156 + FileDescriptorWatcher *controller, 1.157 + Watcher *delegate) { 1.158 + DCHECK(fd > 0); 1.159 + DCHECK(controller); 1.160 + DCHECK(delegate); 1.161 + DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); 1.162 + 1.163 + int event_mask = persistent ? EV_PERSIST : 0; 1.164 + if ((mode & WATCH_READ) != 0) { 1.165 + event_mask |= EV_READ; 1.166 + } 1.167 + if ((mode & WATCH_WRITE) != 0) { 1.168 + event_mask |= EV_WRITE; 1.169 + } 1.170 + 1.171 + // |should_delete_event| is true if we're modifying an event that's currently 1.172 + // active in |controller|. 1.173 + // If we're modifying an existing event and there's an error then we need to 1.174 + // tell libevent to clean it up via event_delete() before returning. 1.175 + bool should_delete_event = true; 1.176 + scoped_ptr<event> evt(controller->ReleaseEvent()); 1.177 + if (evt.get() == NULL) { 1.178 + should_delete_event = false; 1.179 + // Ownership is transferred to the controller. 1.180 + evt.reset(new event); 1.181 + } 1.182 + 1.183 + // Set current interest mask and message pump for this event. 1.184 + event_set(evt.get(), fd, event_mask, OnLibeventNotification, 1.185 + delegate); 1.186 + 1.187 + // Tell libevent which message pump this socket will belong to when we add it. 1.188 + if (event_base_set(event_base_, evt.get()) != 0) { 1.189 + if (should_delete_event) { 1.190 + event_del(evt.get()); 1.191 + } 1.192 + return false; 1.193 + } 1.194 + 1.195 + // Add this socket to the list of monitored sockets. 1.196 + if (event_add(evt.get(), NULL) != 0) { 1.197 + if (should_delete_event) { 1.198 + event_del(evt.get()); 1.199 + } 1.200 + return false; 1.201 + } 1.202 + 1.203 + // Transfer ownership of evt to controller. 1.204 + controller->Init(evt.release(), persistent); 1.205 + return true; 1.206 +} 1.207 + 1.208 + 1.209 +void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, 1.210 + void* context) { 1.211 + Watcher* watcher = static_cast<Watcher*>(context); 1.212 + 1.213 + if (flags & EV_WRITE) { 1.214 + watcher->OnFileCanWriteWithoutBlocking(fd); 1.215 + } 1.216 + if (flags & EV_READ) { 1.217 + watcher->OnFileCanReadWithoutBlocking(fd); 1.218 + } 1.219 +} 1.220 + 1.221 + 1.222 +MessagePumpLibevent::SignalEvent::SignalEvent() : 1.223 + event_(NULL) 1.224 +{ 1.225 +} 1.226 + 1.227 +MessagePumpLibevent::SignalEvent::~SignalEvent() 1.228 +{ 1.229 + if (event_) { 1.230 + StopCatching(); 1.231 + } 1.232 +} 1.233 + 1.234 +void 1.235 +MessagePumpLibevent::SignalEvent::Init(event *e) 1.236 +{ 1.237 + DCHECK(e); 1.238 + DCHECK(event_ == NULL); 1.239 + event_ = e; 1.240 +} 1.241 + 1.242 +bool 1.243 +MessagePumpLibevent::SignalEvent::StopCatching() 1.244 +{ 1.245 + // XXX/cjones: this code could be shared with 1.246 + // FileDescriptorWatcher. ironic that libevent is "more" 1.247 + // object-oriented than this C++ 1.248 + event* e = ReleaseEvent(); 1.249 + if (e == NULL) 1.250 + return true; 1.251 + 1.252 + // event_del() is a no-op if the event isn't active. 1.253 + int rv = event_del(e); 1.254 + delete e; 1.255 + return (rv == 0); 1.256 +} 1.257 + 1.258 +event * 1.259 +MessagePumpLibevent::SignalEvent::ReleaseEvent() 1.260 +{ 1.261 + event *e = event_; 1.262 + event_ = NULL; 1.263 + return e; 1.264 +} 1.265 + 1.266 +bool 1.267 +MessagePumpLibevent::CatchSignal(int sig, 1.268 + SignalEvent* sigevent, 1.269 + SignalWatcher* delegate) 1.270 +{ 1.271 + DCHECK(sig > 0); 1.272 + DCHECK(sigevent); 1.273 + DCHECK(delegate); 1.274 + // TODO if we want to support re-using SignalEvents, this code needs 1.275 + // to jump through the same hoops as WatchFileDescriptor(). Not 1.276 + // needed at present 1.277 + DCHECK(NULL == sigevent->event_); 1.278 + 1.279 + scoped_ptr<event> evt(new event); 1.280 + signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate); 1.281 + 1.282 + if (event_base_set(event_base_, evt.get())) 1.283 + return false; 1.284 + 1.285 + if (signal_add(evt.get(), NULL)) 1.286 + return false; 1.287 + 1.288 + // Transfer ownership of evt to controller. 1.289 + sigevent->Init(evt.release()); 1.290 + return true; 1.291 +} 1.292 + 1.293 +void 1.294 +MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags, 1.295 + void* context) 1.296 +{ 1.297 + DCHECK(sig > 0); 1.298 + DCHECK(EV_SIGNAL == flags); 1.299 + DCHECK(context); 1.300 + reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig); 1.301 +} 1.302 + 1.303 + 1.304 +// Reentrant! 1.305 +void MessagePumpLibevent::Run(Delegate* delegate) { 1.306 + DCHECK(keep_running_) << "Quit must have been called outside of Run!"; 1.307 + 1.308 + bool old_in_run = in_run_; 1.309 + in_run_ = true; 1.310 + 1.311 + for (;;) { 1.312 + ScopedNSAutoreleasePool autorelease_pool; 1.313 + 1.314 + bool did_work = delegate->DoWork(); 1.315 + if (!keep_running_) 1.316 + break; 1.317 + 1.318 + did_work |= delegate->DoDelayedWork(&delayed_work_time_); 1.319 + if (!keep_running_) 1.320 + break; 1.321 + 1.322 + if (did_work) 1.323 + continue; 1.324 + 1.325 + did_work = delegate->DoIdleWork(); 1.326 + if (!keep_running_) 1.327 + break; 1.328 + 1.329 + if (did_work) 1.330 + continue; 1.331 + 1.332 + // EVLOOP_ONCE tells libevent to only block once, 1.333 + // but to service all pending events when it wakes up. 1.334 + if (delayed_work_time_.is_null()) { 1.335 + event_base_loop(event_base_, EVLOOP_ONCE); 1.336 + } else { 1.337 + TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); 1.338 + if (delay > TimeDelta()) { 1.339 + struct timeval poll_tv; 1.340 + poll_tv.tv_sec = delay.InSeconds(); 1.341 + poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; 1.342 + event_base_loopexit(event_base_, &poll_tv); 1.343 + event_base_loop(event_base_, EVLOOP_ONCE); 1.344 + } else { 1.345 + // It looks like delayed_work_time_ indicates a time in the past, so we 1.346 + // need to call DoDelayedWork now. 1.347 + delayed_work_time_ = TimeTicks(); 1.348 + } 1.349 + } 1.350 + } 1.351 + 1.352 + keep_running_ = true; 1.353 + in_run_ = old_in_run; 1.354 +} 1.355 + 1.356 +void MessagePumpLibevent::Quit() { 1.357 + DCHECK(in_run_); 1.358 + // Tell both libevent and Run that they should break out of their loops. 1.359 + keep_running_ = false; 1.360 + ScheduleWork(); 1.361 +} 1.362 + 1.363 +void MessagePumpLibevent::ScheduleWork() { 1.364 + // Tell libevent (in a threadsafe way) that it should break out of its loop. 1.365 + char buf = 0; 1.366 + int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); 1.367 + DCHECK(nwrite == 1 || errno == EAGAIN) 1.368 + << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; 1.369 +} 1.370 + 1.371 +void MessagePumpLibevent::ScheduleDelayedWork( 1.372 + const TimeTicks& delayed_work_time) { 1.373 + // We know that we can't be blocked on Wait right now since this method can 1.374 + // only be called on the same thread as Run, so we only need to update our 1.375 + // record of how long to sleep when we do sleep. 1.376 + delayed_work_time_ = delayed_work_time; 1.377 +} 1.378 + 1.379 +void LineWatcher::OnFileCanReadWithoutBlocking(int aFd) 1.380 +{ 1.381 + ssize_t length = 0; 1.382 + 1.383 + while (true) { 1.384 + length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex); 1.385 + DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex)); 1.386 + if (length <= 0) { 1.387 + if (length < 0) { 1.388 + if (errno == EINTR) { 1.389 + continue; // retry system call when interrupted 1.390 + } 1.391 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.392 + return; // no data available: return and re-poll 1.393 + } 1.394 + DLOG(ERROR) << "Can't read from fd, error " << errno; 1.395 + } else { 1.396 + DLOG(ERROR) << "End of file"; 1.397 + } 1.398 + // At this point, assume that we can't actually access 1.399 + // the socket anymore, and indicate an error. 1.400 + OnError(); 1.401 + mReceivedIndex = 0; 1.402 + return; 1.403 + } 1.404 + 1.405 + while (length-- > 0) { 1.406 + DCHECK(mReceivedIndex < mBufferSize); 1.407 + if (mReceiveBuffer[mReceivedIndex] == mTerminator) { 1.408 + nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex); 1.409 + OnLineRead(aFd, message); 1.410 + if (length > 0) { 1.411 + DCHECK(mReceivedIndex < (mBufferSize - 1)); 1.412 + memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length); 1.413 + } 1.414 + mReceivedIndex = 0; 1.415 + } else { 1.416 + mReceivedIndex++; 1.417 + } 1.418 + } 1.419 + } 1.420 +} 1.421 +} // namespace base