Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 // Copyright (c) 2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/message_pump_libevent.h"
7 #include <errno.h>
8 #include <fcntl.h>
9 #if defined(ANDROID) || defined(OS_POSIX)
10 #include <unistd.h>
11 #endif
13 #include "eintr_wrapper.h"
14 #include "base/logging.h"
15 #include "base/scoped_nsautorelease_pool.h"
16 #include "base/scoped_ptr.h"
17 #include "base/time.h"
18 #include "nsDependentSubstring.h"
19 #include "third_party/libevent/event.h"
21 // Lifecycle of struct event
22 // Libevent uses two main data structures:
23 // struct event_base (of which there is one per message pump), and
24 // struct event (of which there is roughly one per socket).
25 // The socket's struct event is created in
26 // MessagePumpLibevent::WatchFileDescriptor(),
27 // is owned by the FileDescriptorWatcher, and is destroyed in
28 // StopWatchingFileDescriptor().
29 // It is moved into and out of lists in struct event_base by
30 // the libevent functions event_add() and event_del().
31 //
32 // TODO(dkegel):
33 // At the moment bad things happen if a FileDescriptorWatcher
34 // is active after its MessagePumpLibevent has been destroyed.
35 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
36 // Not clear yet whether that situation occurs in practice,
37 // but if it does, we need to fix it.
39 namespace base {
41 // Return 0 on success
42 // Too small a function to bother putting in a library?
43 static int SetNonBlocking(int fd) {
44 int flags = fcntl(fd, F_GETFL, 0);
45 if (flags == -1)
46 flags = 0;
47 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
48 }
50 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
51 : is_persistent_(false),
52 event_(NULL) {
53 }
55 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
56 if (event_) {
57 StopWatchingFileDescriptor();
58 }
59 }
61 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
62 bool is_persistent) {
63 DCHECK(e);
64 DCHECK(event_ == NULL);
66 is_persistent_ = is_persistent;
67 event_ = e;
68 }
70 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
71 struct event *e = event_;
72 event_ = NULL;
73 return e;
74 }
76 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
77 event* e = ReleaseEvent();
78 if (e == NULL)
79 return true;
81 // event_del() is a no-op if the event isn't active.
82 int rv = event_del(e);
83 delete e;
84 return (rv == 0);
85 }
87 // Called if a byte is received on the wakeup pipe.
88 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
89 base::MessagePumpLibevent* that =
90 static_cast<base::MessagePumpLibevent*>(context);
91 DCHECK(that->wakeup_pipe_out_ == socket);
93 // Remove and discard the wakeup byte.
94 char buf;
95 int nread = HANDLE_EINTR(read(socket, &buf, 1));
96 DCHECK_EQ(nread, 1);
97 // Tell libevent to break out of inner loop.
98 event_base_loopbreak(that->event_base_);
99 }
101 MessagePumpLibevent::MessagePumpLibevent()
102 : keep_running_(true),
103 in_run_(false),
104 event_base_(event_base_new()),
105 wakeup_pipe_in_(-1),
106 wakeup_pipe_out_(-1) {
107 if (!Init())
108 NOTREACHED();
109 }
111 bool MessagePumpLibevent::Init() {
112 int fds[2];
113 if (pipe(fds)) {
114 DLOG(ERROR) << "pipe() failed, errno: " << errno;
115 return false;
116 }
117 if (SetNonBlocking(fds[0])) {
118 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
119 return false;
120 }
121 if (SetNonBlocking(fds[1])) {
122 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
123 return false;
124 }
125 wakeup_pipe_out_ = fds[0];
126 wakeup_pipe_in_ = fds[1];
128 wakeup_event_ = new event;
129 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
130 OnWakeup, this);
131 event_base_set(event_base_, wakeup_event_);
133 if (event_add(wakeup_event_, 0))
134 return false;
135 return true;
136 }
138 MessagePumpLibevent::~MessagePumpLibevent() {
139 DCHECK(wakeup_event_);
140 DCHECK(event_base_);
141 event_del(wakeup_event_);
142 delete wakeup_event_;
143 if (wakeup_pipe_in_ >= 0)
144 close(wakeup_pipe_in_);
145 if (wakeup_pipe_out_ >= 0)
146 close(wakeup_pipe_out_);
147 event_base_free(event_base_);
148 }
150 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
151 bool persistent,
152 Mode mode,
153 FileDescriptorWatcher *controller,
154 Watcher *delegate) {
155 DCHECK(fd > 0);
156 DCHECK(controller);
157 DCHECK(delegate);
158 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
160 int event_mask = persistent ? EV_PERSIST : 0;
161 if ((mode & WATCH_READ) != 0) {
162 event_mask |= EV_READ;
163 }
164 if ((mode & WATCH_WRITE) != 0) {
165 event_mask |= EV_WRITE;
166 }
168 // |should_delete_event| is true if we're modifying an event that's currently
169 // active in |controller|.
170 // If we're modifying an existing event and there's an error then we need to
171 // tell libevent to clean it up via event_delete() before returning.
172 bool should_delete_event = true;
173 scoped_ptr<event> evt(controller->ReleaseEvent());
174 if (evt.get() == NULL) {
175 should_delete_event = false;
176 // Ownership is transferred to the controller.
177 evt.reset(new event);
178 }
180 // Set current interest mask and message pump for this event.
181 event_set(evt.get(), fd, event_mask, OnLibeventNotification,
182 delegate);
184 // Tell libevent which message pump this socket will belong to when we add it.
185 if (event_base_set(event_base_, evt.get()) != 0) {
186 if (should_delete_event) {
187 event_del(evt.get());
188 }
189 return false;
190 }
192 // Add this socket to the list of monitored sockets.
193 if (event_add(evt.get(), NULL) != 0) {
194 if (should_delete_event) {
195 event_del(evt.get());
196 }
197 return false;
198 }
200 // Transfer ownership of evt to controller.
201 controller->Init(evt.release(), persistent);
202 return true;
203 }
206 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
207 void* context) {
208 Watcher* watcher = static_cast<Watcher*>(context);
210 if (flags & EV_WRITE) {
211 watcher->OnFileCanWriteWithoutBlocking(fd);
212 }
213 if (flags & EV_READ) {
214 watcher->OnFileCanReadWithoutBlocking(fd);
215 }
216 }
219 MessagePumpLibevent::SignalEvent::SignalEvent() :
220 event_(NULL)
221 {
222 }
224 MessagePumpLibevent::SignalEvent::~SignalEvent()
225 {
226 if (event_) {
227 StopCatching();
228 }
229 }
231 void
232 MessagePumpLibevent::SignalEvent::Init(event *e)
233 {
234 DCHECK(e);
235 DCHECK(event_ == NULL);
236 event_ = e;
237 }
239 bool
240 MessagePumpLibevent::SignalEvent::StopCatching()
241 {
242 // XXX/cjones: this code could be shared with
243 // FileDescriptorWatcher. ironic that libevent is "more"
244 // object-oriented than this C++
245 event* e = ReleaseEvent();
246 if (e == NULL)
247 return true;
249 // event_del() is a no-op if the event isn't active.
250 int rv = event_del(e);
251 delete e;
252 return (rv == 0);
253 }
255 event *
256 MessagePumpLibevent::SignalEvent::ReleaseEvent()
257 {
258 event *e = event_;
259 event_ = NULL;
260 return e;
261 }
263 bool
264 MessagePumpLibevent::CatchSignal(int sig,
265 SignalEvent* sigevent,
266 SignalWatcher* delegate)
267 {
268 DCHECK(sig > 0);
269 DCHECK(sigevent);
270 DCHECK(delegate);
271 // TODO if we want to support re-using SignalEvents, this code needs
272 // to jump through the same hoops as WatchFileDescriptor(). Not
273 // needed at present
274 DCHECK(NULL == sigevent->event_);
276 scoped_ptr<event> evt(new event);
277 signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate);
279 if (event_base_set(event_base_, evt.get()))
280 return false;
282 if (signal_add(evt.get(), NULL))
283 return false;
285 // Transfer ownership of evt to controller.
286 sigevent->Init(evt.release());
287 return true;
288 }
290 void
291 MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags,
292 void* context)
293 {
294 DCHECK(sig > 0);
295 DCHECK(EV_SIGNAL == flags);
296 DCHECK(context);
297 reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig);
298 }
301 // Reentrant!
302 void MessagePumpLibevent::Run(Delegate* delegate) {
303 DCHECK(keep_running_) << "Quit must have been called outside of Run!";
305 bool old_in_run = in_run_;
306 in_run_ = true;
308 for (;;) {
309 ScopedNSAutoreleasePool autorelease_pool;
311 bool did_work = delegate->DoWork();
312 if (!keep_running_)
313 break;
315 did_work |= delegate->DoDelayedWork(&delayed_work_time_);
316 if (!keep_running_)
317 break;
319 if (did_work)
320 continue;
322 did_work = delegate->DoIdleWork();
323 if (!keep_running_)
324 break;
326 if (did_work)
327 continue;
329 // EVLOOP_ONCE tells libevent to only block once,
330 // but to service all pending events when it wakes up.
331 if (delayed_work_time_.is_null()) {
332 event_base_loop(event_base_, EVLOOP_ONCE);
333 } else {
334 TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
335 if (delay > TimeDelta()) {
336 struct timeval poll_tv;
337 poll_tv.tv_sec = delay.InSeconds();
338 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
339 event_base_loopexit(event_base_, &poll_tv);
340 event_base_loop(event_base_, EVLOOP_ONCE);
341 } else {
342 // It looks like delayed_work_time_ indicates a time in the past, so we
343 // need to call DoDelayedWork now.
344 delayed_work_time_ = TimeTicks();
345 }
346 }
347 }
349 keep_running_ = true;
350 in_run_ = old_in_run;
351 }
353 void MessagePumpLibevent::Quit() {
354 DCHECK(in_run_);
355 // Tell both libevent and Run that they should break out of their loops.
356 keep_running_ = false;
357 ScheduleWork();
358 }
360 void MessagePumpLibevent::ScheduleWork() {
361 // Tell libevent (in a threadsafe way) that it should break out of its loop.
362 char buf = 0;
363 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
364 DCHECK(nwrite == 1 || errno == EAGAIN)
365 << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
366 }
368 void MessagePumpLibevent::ScheduleDelayedWork(
369 const TimeTicks& delayed_work_time) {
370 // We know that we can't be blocked on Wait right now since this method can
371 // only be called on the same thread as Run, so we only need to update our
372 // record of how long to sleep when we do sleep.
373 delayed_work_time_ = delayed_work_time;
374 }
376 void LineWatcher::OnFileCanReadWithoutBlocking(int aFd)
377 {
378 ssize_t length = 0;
380 while (true) {
381 length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex);
382 DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex));
383 if (length <= 0) {
384 if (length < 0) {
385 if (errno == EINTR) {
386 continue; // retry system call when interrupted
387 }
388 if (errno == EAGAIN || errno == EWOULDBLOCK) {
389 return; // no data available: return and re-poll
390 }
391 DLOG(ERROR) << "Can't read from fd, error " << errno;
392 } else {
393 DLOG(ERROR) << "End of file";
394 }
395 // At this point, assume that we can't actually access
396 // the socket anymore, and indicate an error.
397 OnError();
398 mReceivedIndex = 0;
399 return;
400 }
402 while (length-- > 0) {
403 DCHECK(mReceivedIndex < mBufferSize);
404 if (mReceiveBuffer[mReceivedIndex] == mTerminator) {
405 nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex);
406 OnLineRead(aFd, message);
407 if (length > 0) {
408 DCHECK(mReceivedIndex < (mBufferSize - 1));
409 memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
410 }
411 mReceivedIndex = 0;
412 } else {
413 mReceivedIndex++;
414 }
415 }
416 }
417 }
418 } // namespace base