|
1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. |
|
2 // Use of this source code is governed by a BSD-style license that can be |
|
3 // found in the LICENSE file. |
|
4 |
|
5 #include "base/message_pump_libevent.h" |
|
6 |
|
7 #include <errno.h> |
|
8 #include <fcntl.h> |
|
9 #if defined(ANDROID) || defined(OS_POSIX) |
|
10 #include <unistd.h> |
|
11 #endif |
|
12 |
|
13 #include "eintr_wrapper.h" |
|
14 #include "base/logging.h" |
|
15 #include "base/scoped_nsautorelease_pool.h" |
|
16 #include "base/scoped_ptr.h" |
|
17 #include "base/time.h" |
|
18 #include "nsDependentSubstring.h" |
|
19 #include "third_party/libevent/event.h" |
|
20 |
|
21 // Lifecycle of struct event |
|
22 // Libevent uses two main data structures: |
|
23 // struct event_base (of which there is one per message pump), and |
|
24 // struct event (of which there is roughly one per socket). |
|
25 // The socket's struct event is created in |
|
26 // MessagePumpLibevent::WatchFileDescriptor(), |
|
27 // is owned by the FileDescriptorWatcher, and is destroyed in |
|
28 // StopWatchingFileDescriptor(). |
|
29 // It is moved into and out of lists in struct event_base by |
|
30 // the libevent functions event_add() and event_del(). |
|
31 // |
|
32 // TODO(dkegel): |
|
33 // At the moment bad things happen if a FileDescriptorWatcher |
|
34 // is active after its MessagePumpLibevent has been destroyed. |
|
35 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop |
|
36 // Not clear yet whether that situation occurs in practice, |
|
37 // but if it does, we need to fix it. |
|
38 |
|
39 namespace base { |
|
40 |
|
41 // Return 0 on success |
|
42 // Too small a function to bother putting in a library? |
|
43 static int SetNonBlocking(int fd) { |
|
44 int flags = fcntl(fd, F_GETFL, 0); |
|
45 if (flags == -1) |
|
46 flags = 0; |
|
47 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
|
48 } |
|
49 |
|
50 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() |
|
51 : is_persistent_(false), |
|
52 event_(NULL) { |
|
53 } |
|
54 |
|
55 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
|
56 if (event_) { |
|
57 StopWatchingFileDescriptor(); |
|
58 } |
|
59 } |
|
60 |
|
61 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, |
|
62 bool is_persistent) { |
|
63 DCHECK(e); |
|
64 DCHECK(event_ == NULL); |
|
65 |
|
66 is_persistent_ = is_persistent; |
|
67 event_ = e; |
|
68 } |
|
69 |
|
70 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { |
|
71 struct event *e = event_; |
|
72 event_ = NULL; |
|
73 return e; |
|
74 } |
|
75 |
|
76 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
|
77 event* e = ReleaseEvent(); |
|
78 if (e == NULL) |
|
79 return true; |
|
80 |
|
81 // event_del() is a no-op if the event isn't active. |
|
82 int rv = event_del(e); |
|
83 delete e; |
|
84 return (rv == 0); |
|
85 } |
|
86 |
|
87 // Called if a byte is received on the wakeup pipe. |
|
88 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
|
89 base::MessagePumpLibevent* that = |
|
90 static_cast<base::MessagePumpLibevent*>(context); |
|
91 DCHECK(that->wakeup_pipe_out_ == socket); |
|
92 |
|
93 // Remove and discard the wakeup byte. |
|
94 char buf; |
|
95 int nread = HANDLE_EINTR(read(socket, &buf, 1)); |
|
96 DCHECK_EQ(nread, 1); |
|
97 // Tell libevent to break out of inner loop. |
|
98 event_base_loopbreak(that->event_base_); |
|
99 } |
|
100 |
|
101 MessagePumpLibevent::MessagePumpLibevent() |
|
102 : keep_running_(true), |
|
103 in_run_(false), |
|
104 event_base_(event_base_new()), |
|
105 wakeup_pipe_in_(-1), |
|
106 wakeup_pipe_out_(-1) { |
|
107 if (!Init()) |
|
108 NOTREACHED(); |
|
109 } |
|
110 |
|
111 bool MessagePumpLibevent::Init() { |
|
112 int fds[2]; |
|
113 if (pipe(fds)) { |
|
114 DLOG(ERROR) << "pipe() failed, errno: " << errno; |
|
115 return false; |
|
116 } |
|
117 if (SetNonBlocking(fds[0])) { |
|
118 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; |
|
119 return false; |
|
120 } |
|
121 if (SetNonBlocking(fds[1])) { |
|
122 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; |
|
123 return false; |
|
124 } |
|
125 wakeup_pipe_out_ = fds[0]; |
|
126 wakeup_pipe_in_ = fds[1]; |
|
127 |
|
128 wakeup_event_ = new event; |
|
129 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, |
|
130 OnWakeup, this); |
|
131 event_base_set(event_base_, wakeup_event_); |
|
132 |
|
133 if (event_add(wakeup_event_, 0)) |
|
134 return false; |
|
135 return true; |
|
136 } |
|
137 |
|
138 MessagePumpLibevent::~MessagePumpLibevent() { |
|
139 DCHECK(wakeup_event_); |
|
140 DCHECK(event_base_); |
|
141 event_del(wakeup_event_); |
|
142 delete wakeup_event_; |
|
143 if (wakeup_pipe_in_ >= 0) |
|
144 close(wakeup_pipe_in_); |
|
145 if (wakeup_pipe_out_ >= 0) |
|
146 close(wakeup_pipe_out_); |
|
147 event_base_free(event_base_); |
|
148 } |
|
149 |
|
150 bool MessagePumpLibevent::WatchFileDescriptor(int fd, |
|
151 bool persistent, |
|
152 Mode mode, |
|
153 FileDescriptorWatcher *controller, |
|
154 Watcher *delegate) { |
|
155 DCHECK(fd > 0); |
|
156 DCHECK(controller); |
|
157 DCHECK(delegate); |
|
158 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); |
|
159 |
|
160 int event_mask = persistent ? EV_PERSIST : 0; |
|
161 if ((mode & WATCH_READ) != 0) { |
|
162 event_mask |= EV_READ; |
|
163 } |
|
164 if ((mode & WATCH_WRITE) != 0) { |
|
165 event_mask |= EV_WRITE; |
|
166 } |
|
167 |
|
168 // |should_delete_event| is true if we're modifying an event that's currently |
|
169 // active in |controller|. |
|
170 // If we're modifying an existing event and there's an error then we need to |
|
171 // tell libevent to clean it up via event_delete() before returning. |
|
172 bool should_delete_event = true; |
|
173 scoped_ptr<event> evt(controller->ReleaseEvent()); |
|
174 if (evt.get() == NULL) { |
|
175 should_delete_event = false; |
|
176 // Ownership is transferred to the controller. |
|
177 evt.reset(new event); |
|
178 } |
|
179 |
|
180 // Set current interest mask and message pump for this event. |
|
181 event_set(evt.get(), fd, event_mask, OnLibeventNotification, |
|
182 delegate); |
|
183 |
|
184 // Tell libevent which message pump this socket will belong to when we add it. |
|
185 if (event_base_set(event_base_, evt.get()) != 0) { |
|
186 if (should_delete_event) { |
|
187 event_del(evt.get()); |
|
188 } |
|
189 return false; |
|
190 } |
|
191 |
|
192 // Add this socket to the list of monitored sockets. |
|
193 if (event_add(evt.get(), NULL) != 0) { |
|
194 if (should_delete_event) { |
|
195 event_del(evt.get()); |
|
196 } |
|
197 return false; |
|
198 } |
|
199 |
|
200 // Transfer ownership of evt to controller. |
|
201 controller->Init(evt.release(), persistent); |
|
202 return true; |
|
203 } |
|
204 |
|
205 |
|
206 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
|
207 void* context) { |
|
208 Watcher* watcher = static_cast<Watcher*>(context); |
|
209 |
|
210 if (flags & EV_WRITE) { |
|
211 watcher->OnFileCanWriteWithoutBlocking(fd); |
|
212 } |
|
213 if (flags & EV_READ) { |
|
214 watcher->OnFileCanReadWithoutBlocking(fd); |
|
215 } |
|
216 } |
|
217 |
|
218 |
|
219 MessagePumpLibevent::SignalEvent::SignalEvent() : |
|
220 event_(NULL) |
|
221 { |
|
222 } |
|
223 |
|
224 MessagePumpLibevent::SignalEvent::~SignalEvent() |
|
225 { |
|
226 if (event_) { |
|
227 StopCatching(); |
|
228 } |
|
229 } |
|
230 |
|
231 void |
|
232 MessagePumpLibevent::SignalEvent::Init(event *e) |
|
233 { |
|
234 DCHECK(e); |
|
235 DCHECK(event_ == NULL); |
|
236 event_ = e; |
|
237 } |
|
238 |
|
239 bool |
|
240 MessagePumpLibevent::SignalEvent::StopCatching() |
|
241 { |
|
242 // XXX/cjones: this code could be shared with |
|
243 // FileDescriptorWatcher. ironic that libevent is "more" |
|
244 // object-oriented than this C++ |
|
245 event* e = ReleaseEvent(); |
|
246 if (e == NULL) |
|
247 return true; |
|
248 |
|
249 // event_del() is a no-op if the event isn't active. |
|
250 int rv = event_del(e); |
|
251 delete e; |
|
252 return (rv == 0); |
|
253 } |
|
254 |
|
255 event * |
|
256 MessagePumpLibevent::SignalEvent::ReleaseEvent() |
|
257 { |
|
258 event *e = event_; |
|
259 event_ = NULL; |
|
260 return e; |
|
261 } |
|
262 |
|
263 bool |
|
264 MessagePumpLibevent::CatchSignal(int sig, |
|
265 SignalEvent* sigevent, |
|
266 SignalWatcher* delegate) |
|
267 { |
|
268 DCHECK(sig > 0); |
|
269 DCHECK(sigevent); |
|
270 DCHECK(delegate); |
|
271 // TODO if we want to support re-using SignalEvents, this code needs |
|
272 // to jump through the same hoops as WatchFileDescriptor(). Not |
|
273 // needed at present |
|
274 DCHECK(NULL == sigevent->event_); |
|
275 |
|
276 scoped_ptr<event> evt(new event); |
|
277 signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate); |
|
278 |
|
279 if (event_base_set(event_base_, evt.get())) |
|
280 return false; |
|
281 |
|
282 if (signal_add(evt.get(), NULL)) |
|
283 return false; |
|
284 |
|
285 // Transfer ownership of evt to controller. |
|
286 sigevent->Init(evt.release()); |
|
287 return true; |
|
288 } |
|
289 |
|
290 void |
|
291 MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags, |
|
292 void* context) |
|
293 { |
|
294 DCHECK(sig > 0); |
|
295 DCHECK(EV_SIGNAL == flags); |
|
296 DCHECK(context); |
|
297 reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig); |
|
298 } |
|
299 |
|
300 |
|
301 // Reentrant! |
|
302 void MessagePumpLibevent::Run(Delegate* delegate) { |
|
303 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; |
|
304 |
|
305 bool old_in_run = in_run_; |
|
306 in_run_ = true; |
|
307 |
|
308 for (;;) { |
|
309 ScopedNSAutoreleasePool autorelease_pool; |
|
310 |
|
311 bool did_work = delegate->DoWork(); |
|
312 if (!keep_running_) |
|
313 break; |
|
314 |
|
315 did_work |= delegate->DoDelayedWork(&delayed_work_time_); |
|
316 if (!keep_running_) |
|
317 break; |
|
318 |
|
319 if (did_work) |
|
320 continue; |
|
321 |
|
322 did_work = delegate->DoIdleWork(); |
|
323 if (!keep_running_) |
|
324 break; |
|
325 |
|
326 if (did_work) |
|
327 continue; |
|
328 |
|
329 // EVLOOP_ONCE tells libevent to only block once, |
|
330 // but to service all pending events when it wakes up. |
|
331 if (delayed_work_time_.is_null()) { |
|
332 event_base_loop(event_base_, EVLOOP_ONCE); |
|
333 } else { |
|
334 TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); |
|
335 if (delay > TimeDelta()) { |
|
336 struct timeval poll_tv; |
|
337 poll_tv.tv_sec = delay.InSeconds(); |
|
338 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; |
|
339 event_base_loopexit(event_base_, &poll_tv); |
|
340 event_base_loop(event_base_, EVLOOP_ONCE); |
|
341 } else { |
|
342 // It looks like delayed_work_time_ indicates a time in the past, so we |
|
343 // need to call DoDelayedWork now. |
|
344 delayed_work_time_ = TimeTicks(); |
|
345 } |
|
346 } |
|
347 } |
|
348 |
|
349 keep_running_ = true; |
|
350 in_run_ = old_in_run; |
|
351 } |
|
352 |
|
353 void MessagePumpLibevent::Quit() { |
|
354 DCHECK(in_run_); |
|
355 // Tell both libevent and Run that they should break out of their loops. |
|
356 keep_running_ = false; |
|
357 ScheduleWork(); |
|
358 } |
|
359 |
|
360 void MessagePumpLibevent::ScheduleWork() { |
|
361 // Tell libevent (in a threadsafe way) that it should break out of its loop. |
|
362 char buf = 0; |
|
363 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); |
|
364 DCHECK(nwrite == 1 || errno == EAGAIN) |
|
365 << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; |
|
366 } |
|
367 |
|
368 void MessagePumpLibevent::ScheduleDelayedWork( |
|
369 const TimeTicks& delayed_work_time) { |
|
370 // We know that we can't be blocked on Wait right now since this method can |
|
371 // only be called on the same thread as Run, so we only need to update our |
|
372 // record of how long to sleep when we do sleep. |
|
373 delayed_work_time_ = delayed_work_time; |
|
374 } |
|
375 |
|
376 void LineWatcher::OnFileCanReadWithoutBlocking(int aFd) |
|
377 { |
|
378 ssize_t length = 0; |
|
379 |
|
380 while (true) { |
|
381 length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex); |
|
382 DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex)); |
|
383 if (length <= 0) { |
|
384 if (length < 0) { |
|
385 if (errno == EINTR) { |
|
386 continue; // retry system call when interrupted |
|
387 } |
|
388 if (errno == EAGAIN || errno == EWOULDBLOCK) { |
|
389 return; // no data available: return and re-poll |
|
390 } |
|
391 DLOG(ERROR) << "Can't read from fd, error " << errno; |
|
392 } else { |
|
393 DLOG(ERROR) << "End of file"; |
|
394 } |
|
395 // At this point, assume that we can't actually access |
|
396 // the socket anymore, and indicate an error. |
|
397 OnError(); |
|
398 mReceivedIndex = 0; |
|
399 return; |
|
400 } |
|
401 |
|
402 while (length-- > 0) { |
|
403 DCHECK(mReceivedIndex < mBufferSize); |
|
404 if (mReceiveBuffer[mReceivedIndex] == mTerminator) { |
|
405 nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex); |
|
406 OnLineRead(aFd, message); |
|
407 if (length > 0) { |
|
408 DCHECK(mReceivedIndex < (mBufferSize - 1)); |
|
409 memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length); |
|
410 } |
|
411 mReceivedIndex = 0; |
|
412 } else { |
|
413 mReceivedIndex++; |
|
414 } |
|
415 } |
|
416 } |
|
417 } |
|
418 } // namespace base |