1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/ipc/unixsocket/UnixSocket.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,921 @@ 1.4 +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ 1.5 +/* vim: set ts=2 et sw=2 tw=80: */ 1.6 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.8 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.9 + 1.10 +#include "UnixSocket.h" 1.11 +#include "nsTArray.h" 1.12 +#include "nsXULAppAPI.h" 1.13 +#include <fcntl.h> 1.14 + 1.15 +#ifdef MOZ_TASK_TRACER 1.16 +#include "GeckoTaskTracer.h" 1.17 +using namespace mozilla::tasktracer; 1.18 +#endif 1.19 + 1.20 +static const size_t MAX_READ_SIZE = 1 << 16; 1.21 + 1.22 +namespace mozilla { 1.23 +namespace ipc { 1.24 + 1.25 +class UnixSocketImpl : public UnixSocketWatcher 1.26 +{ 1.27 +public: 1.28 + UnixSocketImpl(MessageLoop* mIOLoop, 1.29 + UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector, 1.30 + const nsACString& aAddress) 1.31 + : UnixSocketWatcher(mIOLoop) 1.32 + , mConsumer(aConsumer) 1.33 + , mConnector(aConnector) 1.34 + , mShuttingDownOnIOThread(false) 1.35 + , mAddress(aAddress) 1.36 + , mDelayedConnectTask(nullptr) 1.37 + { 1.38 + } 1.39 + 1.40 + ~UnixSocketImpl() 1.41 + { 1.42 + MOZ_ASSERT(NS_IsMainThread()); 1.43 + MOZ_ASSERT(IsShutdownOnMainThread()); 1.44 + } 1.45 + 1.46 + void QueueWriteData(UnixSocketRawData* aData) 1.47 + { 1.48 + mOutgoingQ.AppendElement(aData); 1.49 + AddWatchers(WRITE_WATCHER, false); 1.50 + } 1.51 + 1.52 + bool IsShutdownOnMainThread() 1.53 + { 1.54 + MOZ_ASSERT(NS_IsMainThread()); 1.55 + return mConsumer == nullptr; 1.56 + } 1.57 + 1.58 + void ShutdownOnMainThread() 1.59 + { 1.60 + MOZ_ASSERT(NS_IsMainThread()); 1.61 + MOZ_ASSERT(!IsShutdownOnMainThread()); 1.62 + mConsumer = nullptr; 1.63 + } 1.64 + 1.65 + bool IsShutdownOnIOThread() 1.66 + { 1.67 + return mShuttingDownOnIOThread; 1.68 + } 1.69 + 1.70 + void ShutdownOnIOThread() 1.71 + { 1.72 + MOZ_ASSERT(!NS_IsMainThread()); 1.73 + MOZ_ASSERT(!mShuttingDownOnIOThread); 1.74 + 1.75 + Close(); // will also remove fd from I/O loop 1.76 + mShuttingDownOnIOThread = true; 1.77 + } 1.78 + 1.79 + void SetDelayedConnectTask(CancelableTask* aTask) 1.80 + { 1.81 + MOZ_ASSERT(NS_IsMainThread()); 1.82 + mDelayedConnectTask = aTask; 1.83 + } 1.84 + 1.85 + void ClearDelayedConnectTask() 1.86 + { 1.87 + MOZ_ASSERT(NS_IsMainThread()); 1.88 + mDelayedConnectTask = nullptr; 1.89 + } 1.90 + 1.91 + void CancelDelayedConnectTask() 1.92 + { 1.93 + MOZ_ASSERT(NS_IsMainThread()); 1.94 + if (!mDelayedConnectTask) { 1.95 + return; 1.96 + } 1.97 + mDelayedConnectTask->Cancel(); 1.98 + ClearDelayedConnectTask(); 1.99 + } 1.100 + 1.101 + /** 1.102 + * Connect to a socket 1.103 + */ 1.104 + void Connect(); 1.105 + 1.106 + /** 1.107 + * Run bind/listen to prepare for further runs of accept() 1.108 + */ 1.109 + void Listen(); 1.110 + 1.111 + void GetSocketAddr(nsAString& aAddrStr) 1.112 + { 1.113 + if (!mConnector) { 1.114 + NS_WARNING("No connector to get socket address from!"); 1.115 + aAddrStr.Truncate(); 1.116 + return; 1.117 + } 1.118 + mConnector->GetSocketAddr(mAddr, aAddrStr); 1.119 + } 1.120 + 1.121 + /** 1.122 + * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated 1.123 + * directly from main thread. All non-main-thread accesses should happen with 1.124 + * mImpl as container. 1.125 + */ 1.126 + RefPtr<UnixSocketConsumer> mConsumer; 1.127 + 1.128 + void OnAccepted(int aFd, const sockaddr_any* aAddr, 1.129 + socklen_t aAddrLen) MOZ_OVERRIDE; 1.130 + void OnConnected() MOZ_OVERRIDE; 1.131 + void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE; 1.132 + void OnListening() MOZ_OVERRIDE; 1.133 + void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE; 1.134 + void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE; 1.135 + 1.136 +private: 1.137 + // Set up flags on whatever our current file descriptor is. 1.138 + static bool SetSocketFlags(int aFd); 1.139 + 1.140 + void FireSocketError(); 1.141 + 1.142 + /** 1.143 + * Raw data queue. Must be pushed/popped from IO thread only. 1.144 + */ 1.145 + typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue; 1.146 + UnixSocketRawDataQueue mOutgoingQ; 1.147 + 1.148 + /** 1.149 + * Connector object used to create the connection we are currently using. 1.150 + */ 1.151 + nsAutoPtr<UnixSocketConnector> mConnector; 1.152 + 1.153 + /** 1.154 + * If true, do not requeue whatever task we're running 1.155 + */ 1.156 + bool mShuttingDownOnIOThread; 1.157 + 1.158 + /** 1.159 + * Address we are connecting to, assuming we are creating a client connection. 1.160 + */ 1.161 + nsCString mAddress; 1.162 + 1.163 + /** 1.164 + * Size of the socket address struct 1.165 + */ 1.166 + socklen_t mAddrSize; 1.167 + 1.168 + /** 1.169 + * Address struct of the socket currently in use 1.170 + */ 1.171 + sockaddr_any mAddr; 1.172 + 1.173 + /** 1.174 + * Task member for delayed connect task. Should only be access on main thread. 1.175 + */ 1.176 + CancelableTask* mDelayedConnectTask; 1.177 +}; 1.178 + 1.179 +template<class T> 1.180 +class DeleteInstanceRunnable : public nsRunnable 1.181 +{ 1.182 +public: 1.183 + DeleteInstanceRunnable(T* aInstance) 1.184 + : mInstance(aInstance) 1.185 + { } 1.186 + 1.187 + NS_IMETHOD Run() 1.188 + { 1.189 + delete mInstance; 1.190 + 1.191 + return NS_OK; 1.192 + } 1.193 + 1.194 +private: 1.195 + T* mInstance; 1.196 +}; 1.197 + 1.198 +class UnixSocketImplRunnable : public nsRunnable 1.199 +{ 1.200 +public: 1.201 + UnixSocketImpl* GetImpl() const 1.202 + { 1.203 + return mImpl; 1.204 + } 1.205 +protected: 1.206 + UnixSocketImplRunnable(UnixSocketImpl* aImpl) 1.207 + : mImpl(aImpl) 1.208 + { 1.209 + MOZ_ASSERT(aImpl); 1.210 + } 1.211 + virtual ~UnixSocketImplRunnable() 1.212 + { } 1.213 +private: 1.214 + UnixSocketImpl* mImpl; 1.215 +}; 1.216 + 1.217 +class OnSocketEventRunnable : public UnixSocketImplRunnable 1.218 +{ 1.219 +public: 1.220 + enum SocketEvent { 1.221 + CONNECT_SUCCESS, 1.222 + CONNECT_ERROR, 1.223 + DISCONNECT 1.224 + }; 1.225 + 1.226 + OnSocketEventRunnable(UnixSocketImpl* aImpl, SocketEvent e) 1.227 + : UnixSocketImplRunnable(aImpl) 1.228 + , mEvent(e) 1.229 + { 1.230 + MOZ_ASSERT(!NS_IsMainThread()); 1.231 + } 1.232 + 1.233 + NS_IMETHOD Run() MOZ_OVERRIDE 1.234 + { 1.235 + MOZ_ASSERT(NS_IsMainThread()); 1.236 + 1.237 + UnixSocketImpl* impl = GetImpl(); 1.238 + 1.239 + if (impl->IsShutdownOnMainThread()) { 1.240 + NS_WARNING("CloseSocket has already been called!"); 1.241 + // Since we've already explicitly closed and the close happened before 1.242 + // this, this isn't really an error. Since we've warned, return OK. 1.243 + return NS_OK; 1.244 + } 1.245 + if (mEvent == CONNECT_SUCCESS) { 1.246 + impl->mConsumer->NotifySuccess(); 1.247 + } else if (mEvent == CONNECT_ERROR) { 1.248 + impl->mConsumer->NotifyError(); 1.249 + } else if (mEvent == DISCONNECT) { 1.250 + impl->mConsumer->NotifyDisconnect(); 1.251 + } 1.252 + return NS_OK; 1.253 + } 1.254 +private: 1.255 + SocketEvent mEvent; 1.256 +}; 1.257 + 1.258 +class SocketReceiveRunnable : public UnixSocketImplRunnable 1.259 +{ 1.260 +public: 1.261 + SocketReceiveRunnable(UnixSocketImpl* aImpl, UnixSocketRawData* aData) 1.262 + : UnixSocketImplRunnable(aImpl) 1.263 + , mRawData(aData) 1.264 + { 1.265 + MOZ_ASSERT(aData); 1.266 + } 1.267 + 1.268 + NS_IMETHOD Run() MOZ_OVERRIDE 1.269 + { 1.270 + MOZ_ASSERT(NS_IsMainThread()); 1.271 + 1.272 + UnixSocketImpl* impl = GetImpl(); 1.273 + 1.274 + if (impl->IsShutdownOnMainThread()) { 1.275 + NS_WARNING("mConsumer is null, aborting receive!"); 1.276 + // Since we've already explicitly closed and the close happened before 1.277 + // this, this isn't really an error. Since we've warned, return OK. 1.278 + return NS_OK; 1.279 + } 1.280 + 1.281 + MOZ_ASSERT(impl->mConsumer); 1.282 + impl->mConsumer->ReceiveSocketData(mRawData); 1.283 + return NS_OK; 1.284 + } 1.285 +private: 1.286 + nsAutoPtr<UnixSocketRawData> mRawData; 1.287 +}; 1.288 + 1.289 +class RequestClosingSocketRunnable : public UnixSocketImplRunnable 1.290 +{ 1.291 +public: 1.292 + RequestClosingSocketRunnable(UnixSocketImpl* aImpl) 1.293 + : UnixSocketImplRunnable(aImpl) 1.294 + { } 1.295 + 1.296 + NS_IMETHOD Run() MOZ_OVERRIDE 1.297 + { 1.298 + MOZ_ASSERT(NS_IsMainThread()); 1.299 + 1.300 + UnixSocketImpl* impl = GetImpl(); 1.301 + if (impl->IsShutdownOnMainThread()) { 1.302 + NS_WARNING("CloseSocket has already been called!"); 1.303 + // Since we've already explicitly closed and the close happened before 1.304 + // this, this isn't really an error. Since we've warned, return OK. 1.305 + return NS_OK; 1.306 + } 1.307 + 1.308 + // Start from here, same handling flow as calling CloseSocket() from 1.309 + // upper layer 1.310 + impl->mConsumer->CloseSocket(); 1.311 + return NS_OK; 1.312 + } 1.313 +}; 1.314 + 1.315 +class UnixSocketImplTask : public CancelableTask 1.316 +{ 1.317 +public: 1.318 + UnixSocketImpl* GetImpl() const 1.319 + { 1.320 + return mImpl; 1.321 + } 1.322 + void Cancel() MOZ_OVERRIDE 1.323 + { 1.324 + mImpl = nullptr; 1.325 + } 1.326 + bool IsCanceled() const 1.327 + { 1.328 + return !mImpl; 1.329 + } 1.330 +protected: 1.331 + UnixSocketImplTask(UnixSocketImpl* aImpl) 1.332 + : mImpl(aImpl) 1.333 + { 1.334 + MOZ_ASSERT(mImpl); 1.335 + } 1.336 +private: 1.337 + UnixSocketImpl* mImpl; 1.338 +}; 1.339 + 1.340 +class SocketSendTask : public UnixSocketImplTask 1.341 +{ 1.342 +public: 1.343 + SocketSendTask(UnixSocketImpl* aImpl, 1.344 + UnixSocketConsumer* aConsumer, 1.345 + UnixSocketRawData* aData) 1.346 + : UnixSocketImplTask(aImpl) 1.347 + , mConsumer(aConsumer) 1.348 + , mData(aData) 1.349 + { 1.350 + MOZ_ASSERT(aConsumer); 1.351 + MOZ_ASSERT(aData); 1.352 + } 1.353 + void Run() MOZ_OVERRIDE 1.354 + { 1.355 + MOZ_ASSERT(!NS_IsMainThread()); 1.356 + MOZ_ASSERT(!IsCanceled()); 1.357 + 1.358 + UnixSocketImpl* impl = GetImpl(); 1.359 + MOZ_ASSERT(!impl->IsShutdownOnIOThread()); 1.360 + 1.361 + impl->QueueWriteData(mData); 1.362 + } 1.363 +private: 1.364 + nsRefPtr<UnixSocketConsumer> mConsumer; 1.365 + UnixSocketRawData* mData; 1.366 +}; 1.367 + 1.368 +class SocketListenTask : public UnixSocketImplTask 1.369 +{ 1.370 +public: 1.371 + SocketListenTask(UnixSocketImpl* aImpl) 1.372 + : UnixSocketImplTask(aImpl) 1.373 + { } 1.374 + 1.375 + void Run() MOZ_OVERRIDE 1.376 + { 1.377 + MOZ_ASSERT(!NS_IsMainThread()); 1.378 + if (!IsCanceled()) { 1.379 + GetImpl()->Listen(); 1.380 + } 1.381 + } 1.382 +}; 1.383 + 1.384 +class SocketConnectTask : public UnixSocketImplTask 1.385 +{ 1.386 +public: 1.387 + SocketConnectTask(UnixSocketImpl* aImpl) 1.388 + : UnixSocketImplTask(aImpl) 1.389 + { } 1.390 + 1.391 + void Run() MOZ_OVERRIDE 1.392 + { 1.393 + MOZ_ASSERT(!NS_IsMainThread()); 1.394 + MOZ_ASSERT(!IsCanceled()); 1.395 + GetImpl()->Connect(); 1.396 + } 1.397 +}; 1.398 + 1.399 +class SocketDelayedConnectTask : public UnixSocketImplTask 1.400 +{ 1.401 +public: 1.402 + SocketDelayedConnectTask(UnixSocketImpl* aImpl) 1.403 + : UnixSocketImplTask(aImpl) 1.404 + { } 1.405 + 1.406 + void Run() MOZ_OVERRIDE 1.407 + { 1.408 + MOZ_ASSERT(NS_IsMainThread()); 1.409 + if (IsCanceled()) { 1.410 + return; 1.411 + } 1.412 + UnixSocketImpl* impl = GetImpl(); 1.413 + if (impl->IsShutdownOnMainThread()) { 1.414 + return; 1.415 + } 1.416 + impl->ClearDelayedConnectTask(); 1.417 + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl)); 1.418 + } 1.419 +}; 1.420 + 1.421 +class ShutdownSocketTask : public UnixSocketImplTask 1.422 +{ 1.423 +public: 1.424 + ShutdownSocketTask(UnixSocketImpl* aImpl) 1.425 + : UnixSocketImplTask(aImpl) 1.426 + { } 1.427 + 1.428 + void Run() MOZ_OVERRIDE 1.429 + { 1.430 + MOZ_ASSERT(!NS_IsMainThread()); 1.431 + MOZ_ASSERT(!IsCanceled()); 1.432 + 1.433 + UnixSocketImpl* impl = GetImpl(); 1.434 + 1.435 + // At this point, there should be no new events on the IO thread after this 1.436 + // one with the possible exception of a SocketListenTask that 1.437 + // ShutdownOnIOThread will cancel for us. We are now fully shut down, so we 1.438 + // can send a message to the main thread that will delete impl safely knowing 1.439 + // that no more tasks reference it. 1.440 + impl->ShutdownOnIOThread(); 1.441 + 1.442 + nsRefPtr<nsIRunnable> r(new DeleteInstanceRunnable<UnixSocketImpl>(impl)); 1.443 + nsresult rv = NS_DispatchToMainThread(r); 1.444 + NS_ENSURE_SUCCESS_VOID(rv); 1.445 + } 1.446 +}; 1.447 + 1.448 +void 1.449 +UnixSocketImpl::FireSocketError() 1.450 +{ 1.451 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.452 + 1.453 + // Clean up watchers, statuses, fds 1.454 + Close(); 1.455 + 1.456 + // Tell the main thread we've errored 1.457 + nsRefPtr<OnSocketEventRunnable> r = 1.458 + new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_ERROR); 1.459 + NS_DispatchToMainThread(r); 1.460 +} 1.461 + 1.462 +void 1.463 +UnixSocketImpl::Listen() 1.464 +{ 1.465 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.466 + MOZ_ASSERT(mConnector); 1.467 + 1.468 + // This will set things we don't particularly care about, but it will hand 1.469 + // back the correct structure size which is what we do care about. 1.470 + if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) { 1.471 + NS_WARNING("Cannot create socket address!"); 1.472 + FireSocketError(); 1.473 + return; 1.474 + } 1.475 + 1.476 + if (!IsOpen()) { 1.477 + int fd = mConnector->Create(); 1.478 + if (fd < 0) { 1.479 + NS_WARNING("Cannot create socket fd!"); 1.480 + FireSocketError(); 1.481 + return; 1.482 + } 1.483 + if (!SetSocketFlags(fd)) { 1.484 + NS_WARNING("Cannot set socket flags!"); 1.485 + FireSocketError(); 1.486 + return; 1.487 + } 1.488 + SetFd(fd); 1.489 + 1.490 + // calls OnListening on success, or OnError otherwise 1.491 + nsresult rv = UnixSocketWatcher::Listen( 1.492 + reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize); 1.493 + NS_WARN_IF(NS_FAILED(rv)); 1.494 + } 1.495 +} 1.496 + 1.497 +void 1.498 +UnixSocketImpl::Connect() 1.499 +{ 1.500 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.501 + MOZ_ASSERT(mConnector); 1.502 + 1.503 + if (!IsOpen()) { 1.504 + int fd = mConnector->Create(); 1.505 + if (fd < 0) { 1.506 + NS_WARNING("Cannot create socket fd!"); 1.507 + FireSocketError(); 1.508 + return; 1.509 + } 1.510 + if (!SetSocketFlags(fd)) { 1.511 + NS_WARNING("Cannot set socket flags!"); 1.512 + FireSocketError(); 1.513 + return; 1.514 + } 1.515 + SetFd(fd); 1.516 + } 1.517 + 1.518 + if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) { 1.519 + NS_WARNING("Cannot create socket address!"); 1.520 + FireSocketError(); 1.521 + return; 1.522 + } 1.523 + 1.524 + // calls OnConnected() on success, or OnError() otherwise 1.525 + nsresult rv = UnixSocketWatcher::Connect( 1.526 + reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize); 1.527 + NS_WARN_IF(NS_FAILED(rv)); 1.528 +} 1.529 + 1.530 +bool 1.531 +UnixSocketImpl::SetSocketFlags(int aFd) 1.532 +{ 1.533 + // Set socket addr to be reused even if kernel is still waiting to close 1.534 + int n = 1; 1.535 + if (setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n)) < 0) { 1.536 + return false; 1.537 + } 1.538 + 1.539 + // Set close-on-exec bit. 1.540 + int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD)); 1.541 + if (-1 == flags) { 1.542 + return false; 1.543 + } 1.544 + flags |= FD_CLOEXEC; 1.545 + if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) { 1.546 + return false; 1.547 + } 1.548 + 1.549 + // Set non-blocking status flag. 1.550 + flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL)); 1.551 + if (-1 == flags) { 1.552 + return false; 1.553 + } 1.554 + flags |= O_NONBLOCK; 1.555 + if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) { 1.556 + return false; 1.557 + } 1.558 + 1.559 + return true; 1.560 +} 1.561 + 1.562 +void 1.563 +UnixSocketImpl::OnAccepted(int aFd, 1.564 + const sockaddr_any* aAddr, 1.565 + socklen_t aAddrLen) 1.566 +{ 1.567 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.568 + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); 1.569 + MOZ_ASSERT(aAddr); 1.570 + MOZ_ASSERT(aAddrLen <= sizeof(mAddr)); 1.571 + 1.572 + memcpy (&mAddr, aAddr, aAddrLen); 1.573 + mAddrSize = aAddrLen; 1.574 + 1.575 + if (!mConnector->SetUp(aFd)) { 1.576 + NS_WARNING("Could not set up socket!"); 1.577 + return; 1.578 + } 1.579 + 1.580 + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); 1.581 + Close(); 1.582 + if (!SetSocketFlags(aFd)) { 1.583 + return; 1.584 + } 1.585 + SetSocket(aFd, SOCKET_IS_CONNECTED); 1.586 + 1.587 + nsRefPtr<OnSocketEventRunnable> r = 1.588 + new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS); 1.589 + NS_DispatchToMainThread(r); 1.590 + 1.591 + AddWatchers(READ_WATCHER, true); 1.592 + if (!mOutgoingQ.IsEmpty()) { 1.593 + AddWatchers(WRITE_WATCHER, false); 1.594 + } 1.595 +} 1.596 + 1.597 +void 1.598 +UnixSocketImpl::OnConnected() 1.599 +{ 1.600 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.601 + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); 1.602 + 1.603 + if (!SetSocketFlags(GetFd())) { 1.604 + NS_WARNING("Cannot set socket flags!"); 1.605 + FireSocketError(); 1.606 + return; 1.607 + } 1.608 + 1.609 + if (!mConnector->SetUp(GetFd())) { 1.610 + NS_WARNING("Could not set up socket!"); 1.611 + FireSocketError(); 1.612 + return; 1.613 + } 1.614 + 1.615 + nsRefPtr<OnSocketEventRunnable> r = 1.616 + new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS); 1.617 + NS_DispatchToMainThread(r); 1.618 + 1.619 + AddWatchers(READ_WATCHER, true); 1.620 + if (!mOutgoingQ.IsEmpty()) { 1.621 + AddWatchers(WRITE_WATCHER, false); 1.622 + } 1.623 +} 1.624 + 1.625 +void 1.626 +UnixSocketImpl::OnListening() 1.627 +{ 1.628 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.629 + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); 1.630 + 1.631 + if (!mConnector->SetUpListenSocket(GetFd())) { 1.632 + NS_WARNING("Could not set up listen socket!"); 1.633 + FireSocketError(); 1.634 + return; 1.635 + } 1.636 + 1.637 + AddWatchers(READ_WATCHER, true); 1.638 +} 1.639 + 1.640 +void 1.641 +UnixSocketImpl::OnError(const char* aFunction, int aErrno) 1.642 +{ 1.643 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.644 + 1.645 + UnixFdWatcher::OnError(aFunction, aErrno); 1.646 + FireSocketError(); 1.647 +} 1.648 + 1.649 +void 1.650 +UnixSocketImpl::OnSocketCanReceiveWithoutBlocking() 1.651 +{ 1.652 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.653 + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 1.654 + 1.655 + // Read all of the incoming data. 1.656 + while (true) { 1.657 + nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE)); 1.658 + 1.659 + ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize); 1.660 + if (ret <= 0) { 1.661 + if (ret == -1) { 1.662 + if (errno == EINTR) { 1.663 + continue; // retry system call when interrupted 1.664 + } 1.665 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.666 + return; // no data available: return and re-poll 1.667 + } 1.668 + 1.669 +#ifdef DEBUG 1.670 + NS_WARNING("Cannot read from network"); 1.671 +#endif 1.672 + // else fall through to error handling on other errno's 1.673 + } 1.674 + 1.675 + // We're done with our descriptors. Ensure that spurious events don't 1.676 + // cause us to end up back here. 1.677 + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); 1.678 + nsRefPtr<RequestClosingSocketRunnable> r = 1.679 + new RequestClosingSocketRunnable(this); 1.680 + NS_DispatchToMainThread(r); 1.681 + return; 1.682 + } 1.683 + 1.684 +#ifdef MOZ_TASK_TRACER 1.685 + // Make unix socket creation events to be the source events of TaskTracer, 1.686 + // and originate the rest correlation tasks from here. 1.687 + AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET); 1.688 +#endif 1.689 + 1.690 + incoming->mSize = ret; 1.691 + nsRefPtr<SocketReceiveRunnable> r = 1.692 + new SocketReceiveRunnable(this, incoming.forget()); 1.693 + NS_DispatchToMainThread(r); 1.694 + 1.695 + // If ret is less than MAX_READ_SIZE, there's no 1.696 + // more data in the socket for us to read now. 1.697 + if (ret < ssize_t(MAX_READ_SIZE)) { 1.698 + return; 1.699 + } 1.700 + } 1.701 +} 1.702 + 1.703 +void 1.704 +UnixSocketImpl::OnSocketCanSendWithoutBlocking() 1.705 +{ 1.706 + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); 1.707 + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 1.708 + 1.709 + // Try to write the bytes of mCurrentRilRawData. If all were written, continue. 1.710 + // 1.711 + // Otherwise, save the byte position of the next byte to write 1.712 + // within mCurrentWriteOffset, and request another write when the 1.713 + // system won't block. 1.714 + // 1.715 + while (true) { 1.716 + UnixSocketRawData* data; 1.717 + if (mOutgoingQ.IsEmpty()) { 1.718 + return; 1.719 + } 1.720 + data = mOutgoingQ.ElementAt(0); 1.721 + const uint8_t *toWrite; 1.722 + toWrite = data->mData; 1.723 + 1.724 + while (data->mCurrentWriteOffset < data->mSize) { 1.725 + ssize_t write_amount = data->mSize - data->mCurrentWriteOffset; 1.726 + ssize_t written; 1.727 + written = write (GetFd(), toWrite + data->mCurrentWriteOffset, 1.728 + write_amount); 1.729 + if (written > 0) { 1.730 + data->mCurrentWriteOffset += written; 1.731 + } 1.732 + if (written != write_amount) { 1.733 + break; 1.734 + } 1.735 + } 1.736 + 1.737 + if (data->mCurrentWriteOffset != data->mSize) { 1.738 + AddWatchers(WRITE_WATCHER, false); 1.739 + return; 1.740 + } 1.741 + mOutgoingQ.RemoveElementAt(0); 1.742 + delete data; 1.743 + } 1.744 +} 1.745 + 1.746 +UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr) 1.747 + , mConnectionStatus(SOCKET_DISCONNECTED) 1.748 + , mConnectTimestamp(0) 1.749 + , mConnectDelayMs(0) 1.750 +{ 1.751 +} 1.752 + 1.753 +UnixSocketConsumer::~UnixSocketConsumer() 1.754 +{ 1.755 + MOZ_ASSERT(mConnectionStatus == SOCKET_DISCONNECTED); 1.756 + MOZ_ASSERT(!mImpl); 1.757 +} 1.758 + 1.759 +bool 1.760 +UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData) 1.761 +{ 1.762 + MOZ_ASSERT(NS_IsMainThread()); 1.763 + if (!mImpl) { 1.764 + return false; 1.765 + } 1.766 + 1.767 + MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); 1.768 + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, 1.769 + new SocketSendTask(mImpl, this, aData)); 1.770 + return true; 1.771 +} 1.772 + 1.773 +bool 1.774 +UnixSocketConsumer::SendSocketData(const nsACString& aStr) 1.775 +{ 1.776 + MOZ_ASSERT(NS_IsMainThread()); 1.777 + if (!mImpl) { 1.778 + return false; 1.779 + } 1.780 + if (aStr.Length() > MAX_READ_SIZE) { 1.781 + return false; 1.782 + } 1.783 + 1.784 + MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); 1.785 + UnixSocketRawData* d = new UnixSocketRawData(aStr.BeginReading(), 1.786 + aStr.Length()); 1.787 + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, 1.788 + new SocketSendTask(mImpl, this, d)); 1.789 + return true; 1.790 +} 1.791 + 1.792 +void 1.793 +UnixSocketConsumer::CloseSocket() 1.794 +{ 1.795 + MOZ_ASSERT(NS_IsMainThread()); 1.796 + if (!mImpl) { 1.797 + return; 1.798 + } 1.799 + 1.800 + mImpl->CancelDelayedConnectTask(); 1.801 + 1.802 + // From this point on, we consider mImpl as being deleted. 1.803 + // We sever the relationship here so any future calls to listen or connect 1.804 + // will create a new implementation. 1.805 + mImpl->ShutdownOnMainThread(); 1.806 + 1.807 + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, 1.808 + new ShutdownSocketTask(mImpl)); 1.809 + 1.810 + mImpl = nullptr; 1.811 + 1.812 + NotifyDisconnect(); 1.813 +} 1.814 + 1.815 +void 1.816 +UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr) 1.817 +{ 1.818 + aAddrStr.Truncate(); 1.819 + if (!mImpl || mConnectionStatus != SOCKET_CONNECTED) { 1.820 + NS_WARNING("No socket currently open!"); 1.821 + return; 1.822 + } 1.823 + mImpl->GetSocketAddr(aAddrStr); 1.824 +} 1.825 + 1.826 +void 1.827 +UnixSocketConsumer::NotifySuccess() 1.828 +{ 1.829 + MOZ_ASSERT(NS_IsMainThread()); 1.830 + mConnectionStatus = SOCKET_CONNECTED; 1.831 + mConnectTimestamp = PR_IntervalNow(); 1.832 + OnConnectSuccess(); 1.833 +} 1.834 + 1.835 +void 1.836 +UnixSocketConsumer::NotifyError() 1.837 +{ 1.838 + MOZ_ASSERT(NS_IsMainThread()); 1.839 + mConnectionStatus = SOCKET_DISCONNECTED; 1.840 + mConnectDelayMs = CalculateConnectDelayMs(); 1.841 + OnConnectError(); 1.842 +} 1.843 + 1.844 +void 1.845 +UnixSocketConsumer::NotifyDisconnect() 1.846 +{ 1.847 + MOZ_ASSERT(NS_IsMainThread()); 1.848 + mConnectionStatus = SOCKET_DISCONNECTED; 1.849 + mConnectDelayMs = CalculateConnectDelayMs(); 1.850 + OnDisconnect(); 1.851 +} 1.852 + 1.853 +bool 1.854 +UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector, 1.855 + const char* aAddress, 1.856 + int aDelayMs) 1.857 +{ 1.858 + MOZ_ASSERT(aConnector); 1.859 + MOZ_ASSERT(NS_IsMainThread()); 1.860 + 1.861 + nsAutoPtr<UnixSocketConnector> connector(aConnector); 1.862 + 1.863 + if (mImpl) { 1.864 + NS_WARNING("Socket already connecting/connected!"); 1.865 + return false; 1.866 + } 1.867 + 1.868 + nsCString addr(aAddress); 1.869 + MessageLoop* ioLoop = XRE_GetIOMessageLoop(); 1.870 + mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr); 1.871 + mConnectionStatus = SOCKET_CONNECTING; 1.872 + if (aDelayMs > 0) { 1.873 + SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl); 1.874 + mImpl->SetDelayedConnectTask(connectTask); 1.875 + MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs); 1.876 + } else { 1.877 + ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl)); 1.878 + } 1.879 + return true; 1.880 +} 1.881 + 1.882 +bool 1.883 +UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector) 1.884 +{ 1.885 + MOZ_ASSERT(aConnector); 1.886 + MOZ_ASSERT(NS_IsMainThread()); 1.887 + 1.888 + nsAutoPtr<UnixSocketConnector> connector(aConnector); 1.889 + 1.890 + if (mImpl) { 1.891 + NS_WARNING("Socket already connecting/connected!"); 1.892 + return false; 1.893 + } 1.894 + 1.895 + mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(), 1.896 + EmptyCString()); 1.897 + mConnectionStatus = SOCKET_LISTENING; 1.898 + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, 1.899 + new SocketListenTask(mImpl)); 1.900 + return true; 1.901 +} 1.902 + 1.903 +uint32_t 1.904 +UnixSocketConsumer::CalculateConnectDelayMs() const 1.905 +{ 1.906 + MOZ_ASSERT(NS_IsMainThread()); 1.907 + 1.908 + uint32_t connectDelayMs = mConnectDelayMs; 1.909 + 1.910 + if ((PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) { 1.911 + // reset delay if connection has been opened for a while, or... 1.912 + connectDelayMs = 0; 1.913 + } else if (!connectDelayMs) { 1.914 + // ...start with a delay of ~1 sec, or... 1.915 + connectDelayMs = 1<<10; 1.916 + } else if (connectDelayMs < (1<<16)) { 1.917 + // ...otherwise increase delay by a factor of 2 1.918 + connectDelayMs <<= 1; 1.919 + } 1.920 + return connectDelayMs; 1.921 +} 1.922 + 1.923 +} // namespace ipc 1.924 +} // namespace mozilla