michael@0: /* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ michael@0: /* vim: set ts=2 et sw=2 tw=80: */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "UnixSocket.h" michael@0: #include "nsTArray.h" michael@0: #include "nsXULAppAPI.h" michael@0: #include michael@0: michael@0: #ifdef MOZ_TASK_TRACER michael@0: #include "GeckoTaskTracer.h" michael@0: using namespace mozilla::tasktracer; michael@0: #endif michael@0: michael@0: static const size_t MAX_READ_SIZE = 1 << 16; michael@0: michael@0: namespace mozilla { michael@0: namespace ipc { michael@0: michael@0: class UnixSocketImpl : public UnixSocketWatcher michael@0: { michael@0: public: michael@0: UnixSocketImpl(MessageLoop* mIOLoop, michael@0: UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector, michael@0: const nsACString& aAddress) michael@0: : UnixSocketWatcher(mIOLoop) michael@0: , mConsumer(aConsumer) michael@0: , mConnector(aConnector) michael@0: , mShuttingDownOnIOThread(false) michael@0: , mAddress(aAddress) michael@0: , mDelayedConnectTask(nullptr) michael@0: { michael@0: } michael@0: michael@0: ~UnixSocketImpl() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: MOZ_ASSERT(IsShutdownOnMainThread()); michael@0: } michael@0: michael@0: void QueueWriteData(UnixSocketRawData* aData) michael@0: { michael@0: mOutgoingQ.AppendElement(aData); michael@0: AddWatchers(WRITE_WATCHER, false); michael@0: } michael@0: michael@0: bool IsShutdownOnMainThread() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: return mConsumer == nullptr; michael@0: } michael@0: michael@0: void ShutdownOnMainThread() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: MOZ_ASSERT(!IsShutdownOnMainThread()); michael@0: mConsumer = nullptr; michael@0: } michael@0: michael@0: bool IsShutdownOnIOThread() michael@0: { michael@0: return mShuttingDownOnIOThread; michael@0: } michael@0: michael@0: void ShutdownOnIOThread() michael@0: { michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: MOZ_ASSERT(!mShuttingDownOnIOThread); michael@0: michael@0: Close(); // will also remove fd from I/O loop michael@0: mShuttingDownOnIOThread = true; michael@0: } michael@0: michael@0: void SetDelayedConnectTask(CancelableTask* aTask) michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: mDelayedConnectTask = aTask; michael@0: } michael@0: michael@0: void ClearDelayedConnectTask() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: mDelayedConnectTask = nullptr; michael@0: } michael@0: michael@0: void CancelDelayedConnectTask() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: if (!mDelayedConnectTask) { michael@0: return; michael@0: } michael@0: mDelayedConnectTask->Cancel(); michael@0: ClearDelayedConnectTask(); michael@0: } michael@0: michael@0: /** michael@0: * Connect to a socket michael@0: */ michael@0: void Connect(); michael@0: michael@0: /** michael@0: * Run bind/listen to prepare for further runs of accept() michael@0: */ michael@0: void Listen(); michael@0: michael@0: void GetSocketAddr(nsAString& aAddrStr) michael@0: { michael@0: if (!mConnector) { michael@0: NS_WARNING("No connector to get socket address from!"); michael@0: aAddrStr.Truncate(); michael@0: return; michael@0: } michael@0: mConnector->GetSocketAddr(mAddr, aAddrStr); michael@0: } michael@0: michael@0: /** michael@0: * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated michael@0: * directly from main thread. All non-main-thread accesses should happen with michael@0: * mImpl as container. michael@0: */ michael@0: RefPtr mConsumer; michael@0: michael@0: void OnAccepted(int aFd, const sockaddr_any* aAddr, michael@0: socklen_t aAddrLen) MOZ_OVERRIDE; michael@0: void OnConnected() MOZ_OVERRIDE; michael@0: void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE; michael@0: void OnListening() MOZ_OVERRIDE; michael@0: void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE; michael@0: void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE; michael@0: michael@0: private: michael@0: // Set up flags on whatever our current file descriptor is. michael@0: static bool SetSocketFlags(int aFd); michael@0: michael@0: void FireSocketError(); michael@0: michael@0: /** michael@0: * Raw data queue. Must be pushed/popped from IO thread only. michael@0: */ michael@0: typedef nsTArray UnixSocketRawDataQueue; michael@0: UnixSocketRawDataQueue mOutgoingQ; michael@0: michael@0: /** michael@0: * Connector object used to create the connection we are currently using. michael@0: */ michael@0: nsAutoPtr mConnector; michael@0: michael@0: /** michael@0: * If true, do not requeue whatever task we're running michael@0: */ michael@0: bool mShuttingDownOnIOThread; michael@0: michael@0: /** michael@0: * Address we are connecting to, assuming we are creating a client connection. michael@0: */ michael@0: nsCString mAddress; michael@0: michael@0: /** michael@0: * Size of the socket address struct michael@0: */ michael@0: socklen_t mAddrSize; michael@0: michael@0: /** michael@0: * Address struct of the socket currently in use michael@0: */ michael@0: sockaddr_any mAddr; michael@0: michael@0: /** michael@0: * Task member for delayed connect task. Should only be access on main thread. michael@0: */ michael@0: CancelableTask* mDelayedConnectTask; michael@0: }; michael@0: michael@0: template michael@0: class DeleteInstanceRunnable : public nsRunnable michael@0: { michael@0: public: michael@0: DeleteInstanceRunnable(T* aInstance) michael@0: : mInstance(aInstance) michael@0: { } michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: delete mInstance; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: T* mInstance; michael@0: }; michael@0: michael@0: class UnixSocketImplRunnable : public nsRunnable michael@0: { michael@0: public: michael@0: UnixSocketImpl* GetImpl() const michael@0: { michael@0: return mImpl; michael@0: } michael@0: protected: michael@0: UnixSocketImplRunnable(UnixSocketImpl* aImpl) michael@0: : mImpl(aImpl) michael@0: { michael@0: MOZ_ASSERT(aImpl); michael@0: } michael@0: virtual ~UnixSocketImplRunnable() michael@0: { } michael@0: private: michael@0: UnixSocketImpl* mImpl; michael@0: }; michael@0: michael@0: class OnSocketEventRunnable : public UnixSocketImplRunnable michael@0: { michael@0: public: michael@0: enum SocketEvent { michael@0: CONNECT_SUCCESS, michael@0: CONNECT_ERROR, michael@0: DISCONNECT michael@0: }; michael@0: michael@0: OnSocketEventRunnable(UnixSocketImpl* aImpl, SocketEvent e) michael@0: : UnixSocketImplRunnable(aImpl) michael@0: , mEvent(e) michael@0: { michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: } michael@0: michael@0: NS_IMETHOD Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: UnixSocketImpl* impl = GetImpl(); michael@0: michael@0: if (impl->IsShutdownOnMainThread()) { michael@0: NS_WARNING("CloseSocket has already been called!"); michael@0: // Since we've already explicitly closed and the close happened before michael@0: // this, this isn't really an error. Since we've warned, return OK. michael@0: return NS_OK; michael@0: } michael@0: if (mEvent == CONNECT_SUCCESS) { michael@0: impl->mConsumer->NotifySuccess(); michael@0: } else if (mEvent == CONNECT_ERROR) { michael@0: impl->mConsumer->NotifyError(); michael@0: } else if (mEvent == DISCONNECT) { michael@0: impl->mConsumer->NotifyDisconnect(); michael@0: } michael@0: return NS_OK; michael@0: } michael@0: private: michael@0: SocketEvent mEvent; michael@0: }; michael@0: michael@0: class SocketReceiveRunnable : public UnixSocketImplRunnable michael@0: { michael@0: public: michael@0: SocketReceiveRunnable(UnixSocketImpl* aImpl, UnixSocketRawData* aData) michael@0: : UnixSocketImplRunnable(aImpl) michael@0: , mRawData(aData) michael@0: { michael@0: MOZ_ASSERT(aData); michael@0: } michael@0: michael@0: NS_IMETHOD Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: UnixSocketImpl* impl = GetImpl(); michael@0: michael@0: if (impl->IsShutdownOnMainThread()) { michael@0: NS_WARNING("mConsumer is null, aborting receive!"); michael@0: // Since we've already explicitly closed and the close happened before michael@0: // this, this isn't really an error. Since we've warned, return OK. michael@0: return NS_OK; michael@0: } michael@0: michael@0: MOZ_ASSERT(impl->mConsumer); michael@0: impl->mConsumer->ReceiveSocketData(mRawData); michael@0: return NS_OK; michael@0: } michael@0: private: michael@0: nsAutoPtr mRawData; michael@0: }; michael@0: michael@0: class RequestClosingSocketRunnable : public UnixSocketImplRunnable michael@0: { michael@0: public: michael@0: RequestClosingSocketRunnable(UnixSocketImpl* aImpl) michael@0: : UnixSocketImplRunnable(aImpl) michael@0: { } michael@0: michael@0: NS_IMETHOD Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: UnixSocketImpl* impl = GetImpl(); michael@0: if (impl->IsShutdownOnMainThread()) { michael@0: NS_WARNING("CloseSocket has already been called!"); michael@0: // Since we've already explicitly closed and the close happened before michael@0: // this, this isn't really an error. Since we've warned, return OK. michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Start from here, same handling flow as calling CloseSocket() from michael@0: // upper layer michael@0: impl->mConsumer->CloseSocket(); michael@0: return NS_OK; michael@0: } michael@0: }; michael@0: michael@0: class UnixSocketImplTask : public CancelableTask michael@0: { michael@0: public: michael@0: UnixSocketImpl* GetImpl() const michael@0: { michael@0: return mImpl; michael@0: } michael@0: void Cancel() MOZ_OVERRIDE michael@0: { michael@0: mImpl = nullptr; michael@0: } michael@0: bool IsCanceled() const michael@0: { michael@0: return !mImpl; michael@0: } michael@0: protected: michael@0: UnixSocketImplTask(UnixSocketImpl* aImpl) michael@0: : mImpl(aImpl) michael@0: { michael@0: MOZ_ASSERT(mImpl); michael@0: } michael@0: private: michael@0: UnixSocketImpl* mImpl; michael@0: }; michael@0: michael@0: class SocketSendTask : public UnixSocketImplTask michael@0: { michael@0: public: michael@0: SocketSendTask(UnixSocketImpl* aImpl, michael@0: UnixSocketConsumer* aConsumer, michael@0: UnixSocketRawData* aData) michael@0: : UnixSocketImplTask(aImpl) michael@0: , mConsumer(aConsumer) michael@0: , mData(aData) michael@0: { michael@0: MOZ_ASSERT(aConsumer); michael@0: MOZ_ASSERT(aData); michael@0: } michael@0: void Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: MOZ_ASSERT(!IsCanceled()); michael@0: michael@0: UnixSocketImpl* impl = GetImpl(); michael@0: MOZ_ASSERT(!impl->IsShutdownOnIOThread()); michael@0: michael@0: impl->QueueWriteData(mData); michael@0: } michael@0: private: michael@0: nsRefPtr mConsumer; michael@0: UnixSocketRawData* mData; michael@0: }; michael@0: michael@0: class SocketListenTask : public UnixSocketImplTask michael@0: { michael@0: public: michael@0: SocketListenTask(UnixSocketImpl* aImpl) michael@0: : UnixSocketImplTask(aImpl) michael@0: { } michael@0: michael@0: void Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: if (!IsCanceled()) { michael@0: GetImpl()->Listen(); michael@0: } michael@0: } michael@0: }; michael@0: michael@0: class SocketConnectTask : public UnixSocketImplTask michael@0: { michael@0: public: michael@0: SocketConnectTask(UnixSocketImpl* aImpl) michael@0: : UnixSocketImplTask(aImpl) michael@0: { } michael@0: michael@0: void Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: MOZ_ASSERT(!IsCanceled()); michael@0: GetImpl()->Connect(); michael@0: } michael@0: }; michael@0: michael@0: class SocketDelayedConnectTask : public UnixSocketImplTask michael@0: { michael@0: public: michael@0: SocketDelayedConnectTask(UnixSocketImpl* aImpl) michael@0: : UnixSocketImplTask(aImpl) michael@0: { } michael@0: michael@0: void Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: if (IsCanceled()) { michael@0: return; michael@0: } michael@0: UnixSocketImpl* impl = GetImpl(); michael@0: if (impl->IsShutdownOnMainThread()) { michael@0: return; michael@0: } michael@0: impl->ClearDelayedConnectTask(); michael@0: XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl)); michael@0: } michael@0: }; michael@0: michael@0: class ShutdownSocketTask : public UnixSocketImplTask michael@0: { michael@0: public: michael@0: ShutdownSocketTask(UnixSocketImpl* aImpl) michael@0: : UnixSocketImplTask(aImpl) michael@0: { } michael@0: michael@0: void Run() MOZ_OVERRIDE michael@0: { michael@0: MOZ_ASSERT(!NS_IsMainThread()); michael@0: MOZ_ASSERT(!IsCanceled()); michael@0: michael@0: UnixSocketImpl* impl = GetImpl(); michael@0: michael@0: // At this point, there should be no new events on the IO thread after this michael@0: // one with the possible exception of a SocketListenTask that michael@0: // ShutdownOnIOThread will cancel for us. We are now fully shut down, so we michael@0: // can send a message to the main thread that will delete impl safely knowing michael@0: // that no more tasks reference it. michael@0: impl->ShutdownOnIOThread(); michael@0: michael@0: nsRefPtr r(new DeleteInstanceRunnable(impl)); michael@0: nsresult rv = NS_DispatchToMainThread(r); michael@0: NS_ENSURE_SUCCESS_VOID(rv); michael@0: } michael@0: }; michael@0: michael@0: void michael@0: UnixSocketImpl::FireSocketError() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: michael@0: // Clean up watchers, statuses, fds michael@0: Close(); michael@0: michael@0: // Tell the main thread we've errored michael@0: nsRefPtr r = michael@0: new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_ERROR); michael@0: NS_DispatchToMainThread(r); michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::Listen() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(mConnector); michael@0: michael@0: // This will set things we don't particularly care about, but it will hand michael@0: // back the correct structure size which is what we do care about. michael@0: if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) { michael@0: NS_WARNING("Cannot create socket address!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: michael@0: if (!IsOpen()) { michael@0: int fd = mConnector->Create(); michael@0: if (fd < 0) { michael@0: NS_WARNING("Cannot create socket fd!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: if (!SetSocketFlags(fd)) { michael@0: NS_WARNING("Cannot set socket flags!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: SetFd(fd); michael@0: michael@0: // calls OnListening on success, or OnError otherwise michael@0: nsresult rv = UnixSocketWatcher::Listen( michael@0: reinterpret_cast(&mAddr), mAddrSize); michael@0: NS_WARN_IF(NS_FAILED(rv)); michael@0: } michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::Connect() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(mConnector); michael@0: michael@0: if (!IsOpen()) { michael@0: int fd = mConnector->Create(); michael@0: if (fd < 0) { michael@0: NS_WARNING("Cannot create socket fd!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: if (!SetSocketFlags(fd)) { michael@0: NS_WARNING("Cannot set socket flags!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: SetFd(fd); michael@0: } michael@0: michael@0: if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) { michael@0: NS_WARNING("Cannot create socket address!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: michael@0: // calls OnConnected() on success, or OnError() otherwise michael@0: nsresult rv = UnixSocketWatcher::Connect( michael@0: reinterpret_cast(&mAddr), mAddrSize); michael@0: NS_WARN_IF(NS_FAILED(rv)); michael@0: } michael@0: michael@0: bool michael@0: UnixSocketImpl::SetSocketFlags(int aFd) michael@0: { michael@0: // Set socket addr to be reused even if kernel is still waiting to close michael@0: int n = 1; michael@0: if (setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n)) < 0) { michael@0: return false; michael@0: } michael@0: michael@0: // Set close-on-exec bit. michael@0: int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD)); michael@0: if (-1 == flags) { michael@0: return false; michael@0: } michael@0: flags |= FD_CLOEXEC; michael@0: if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) { michael@0: return false; michael@0: } michael@0: michael@0: // Set non-blocking status flag. michael@0: flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL)); michael@0: if (-1 == flags) { michael@0: return false; michael@0: } michael@0: flags |= O_NONBLOCK; michael@0: if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) { michael@0: return false; michael@0: } michael@0: michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::OnAccepted(int aFd, michael@0: const sockaddr_any* aAddr, michael@0: socklen_t aAddrLen) michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); michael@0: MOZ_ASSERT(aAddr); michael@0: MOZ_ASSERT(aAddrLen <= sizeof(mAddr)); michael@0: michael@0: memcpy (&mAddr, aAddr, aAddrLen); michael@0: mAddrSize = aAddrLen; michael@0: michael@0: if (!mConnector->SetUp(aFd)) { michael@0: NS_WARNING("Could not set up socket!"); michael@0: return; michael@0: } michael@0: michael@0: RemoveWatchers(READ_WATCHER|WRITE_WATCHER); michael@0: Close(); michael@0: if (!SetSocketFlags(aFd)) { michael@0: return; michael@0: } michael@0: SetSocket(aFd, SOCKET_IS_CONNECTED); michael@0: michael@0: nsRefPtr r = michael@0: new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS); michael@0: NS_DispatchToMainThread(r); michael@0: michael@0: AddWatchers(READ_WATCHER, true); michael@0: if (!mOutgoingQ.IsEmpty()) { michael@0: AddWatchers(WRITE_WATCHER, false); michael@0: } michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::OnConnected() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); michael@0: michael@0: if (!SetSocketFlags(GetFd())) { michael@0: NS_WARNING("Cannot set socket flags!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: michael@0: if (!mConnector->SetUp(GetFd())) { michael@0: NS_WARNING("Could not set up socket!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: michael@0: nsRefPtr r = michael@0: new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS); michael@0: NS_DispatchToMainThread(r); michael@0: michael@0: AddWatchers(READ_WATCHER, true); michael@0: if (!mOutgoingQ.IsEmpty()) { michael@0: AddWatchers(WRITE_WATCHER, false); michael@0: } michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::OnListening() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); michael@0: michael@0: if (!mConnector->SetUpListenSocket(GetFd())) { michael@0: NS_WARNING("Could not set up listen socket!"); michael@0: FireSocketError(); michael@0: return; michael@0: } michael@0: michael@0: AddWatchers(READ_WATCHER, true); michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::OnError(const char* aFunction, int aErrno) michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: michael@0: UnixFdWatcher::OnError(aFunction, aErrno); michael@0: FireSocketError(); michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::OnSocketCanReceiveWithoutBlocking() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 michael@0: michael@0: // Read all of the incoming data. michael@0: while (true) { michael@0: nsAutoPtr incoming(new UnixSocketRawData(MAX_READ_SIZE)); michael@0: michael@0: ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize); michael@0: if (ret <= 0) { michael@0: if (ret == -1) { michael@0: if (errno == EINTR) { michael@0: continue; // retry system call when interrupted michael@0: } michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: return; // no data available: return and re-poll michael@0: } michael@0: michael@0: #ifdef DEBUG michael@0: NS_WARNING("Cannot read from network"); michael@0: #endif michael@0: // else fall through to error handling on other errno's michael@0: } michael@0: michael@0: // We're done with our descriptors. Ensure that spurious events don't michael@0: // cause us to end up back here. michael@0: RemoveWatchers(READ_WATCHER|WRITE_WATCHER); michael@0: nsRefPtr r = michael@0: new RequestClosingSocketRunnable(this); michael@0: NS_DispatchToMainThread(r); michael@0: return; michael@0: } michael@0: michael@0: #ifdef MOZ_TASK_TRACER michael@0: // Make unix socket creation events to be the source events of TaskTracer, michael@0: // and originate the rest correlation tasks from here. michael@0: AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET); michael@0: #endif michael@0: michael@0: incoming->mSize = ret; michael@0: nsRefPtr r = michael@0: new SocketReceiveRunnable(this, incoming.forget()); michael@0: NS_DispatchToMainThread(r); michael@0: michael@0: // If ret is less than MAX_READ_SIZE, there's no michael@0: // more data in the socket for us to read now. michael@0: if (ret < ssize_t(MAX_READ_SIZE)) { michael@0: return; michael@0: } michael@0: } michael@0: } michael@0: michael@0: void michael@0: UnixSocketImpl::OnSocketCanSendWithoutBlocking() michael@0: { michael@0: MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); michael@0: MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 michael@0: michael@0: // Try to write the bytes of mCurrentRilRawData. If all were written, continue. michael@0: // michael@0: // Otherwise, save the byte position of the next byte to write michael@0: // within mCurrentWriteOffset, and request another write when the michael@0: // system won't block. michael@0: // michael@0: while (true) { michael@0: UnixSocketRawData* data; michael@0: if (mOutgoingQ.IsEmpty()) { michael@0: return; michael@0: } michael@0: data = mOutgoingQ.ElementAt(0); michael@0: const uint8_t *toWrite; michael@0: toWrite = data->mData; michael@0: michael@0: while (data->mCurrentWriteOffset < data->mSize) { michael@0: ssize_t write_amount = data->mSize - data->mCurrentWriteOffset; michael@0: ssize_t written; michael@0: written = write (GetFd(), toWrite + data->mCurrentWriteOffset, michael@0: write_amount); michael@0: if (written > 0) { michael@0: data->mCurrentWriteOffset += written; michael@0: } michael@0: if (written != write_amount) { michael@0: break; michael@0: } michael@0: } michael@0: michael@0: if (data->mCurrentWriteOffset != data->mSize) { michael@0: AddWatchers(WRITE_WATCHER, false); michael@0: return; michael@0: } michael@0: mOutgoingQ.RemoveElementAt(0); michael@0: delete data; michael@0: } michael@0: } michael@0: michael@0: UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr) michael@0: , mConnectionStatus(SOCKET_DISCONNECTED) michael@0: , mConnectTimestamp(0) michael@0: , mConnectDelayMs(0) michael@0: { michael@0: } michael@0: michael@0: UnixSocketConsumer::~UnixSocketConsumer() michael@0: { michael@0: MOZ_ASSERT(mConnectionStatus == SOCKET_DISCONNECTED); michael@0: MOZ_ASSERT(!mImpl); michael@0: } michael@0: michael@0: bool michael@0: UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData) michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: if (!mImpl) { michael@0: return false; michael@0: } michael@0: michael@0: MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); michael@0: XRE_GetIOMessageLoop()->PostTask(FROM_HERE, michael@0: new SocketSendTask(mImpl, this, aData)); michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: UnixSocketConsumer::SendSocketData(const nsACString& aStr) michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: if (!mImpl) { michael@0: return false; michael@0: } michael@0: if (aStr.Length() > MAX_READ_SIZE) { michael@0: return false; michael@0: } michael@0: michael@0: MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); michael@0: UnixSocketRawData* d = new UnixSocketRawData(aStr.BeginReading(), michael@0: aStr.Length()); michael@0: XRE_GetIOMessageLoop()->PostTask(FROM_HERE, michael@0: new SocketSendTask(mImpl, this, d)); michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: UnixSocketConsumer::CloseSocket() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: if (!mImpl) { michael@0: return; michael@0: } michael@0: michael@0: mImpl->CancelDelayedConnectTask(); michael@0: michael@0: // From this point on, we consider mImpl as being deleted. michael@0: // We sever the relationship here so any future calls to listen or connect michael@0: // will create a new implementation. michael@0: mImpl->ShutdownOnMainThread(); michael@0: michael@0: XRE_GetIOMessageLoop()->PostTask(FROM_HERE, michael@0: new ShutdownSocketTask(mImpl)); michael@0: michael@0: mImpl = nullptr; michael@0: michael@0: NotifyDisconnect(); michael@0: } michael@0: michael@0: void michael@0: UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr) michael@0: { michael@0: aAddrStr.Truncate(); michael@0: if (!mImpl || mConnectionStatus != SOCKET_CONNECTED) { michael@0: NS_WARNING("No socket currently open!"); michael@0: return; michael@0: } michael@0: mImpl->GetSocketAddr(aAddrStr); michael@0: } michael@0: michael@0: void michael@0: UnixSocketConsumer::NotifySuccess() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: mConnectionStatus = SOCKET_CONNECTED; michael@0: mConnectTimestamp = PR_IntervalNow(); michael@0: OnConnectSuccess(); michael@0: } michael@0: michael@0: void michael@0: UnixSocketConsumer::NotifyError() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: mConnectionStatus = SOCKET_DISCONNECTED; michael@0: mConnectDelayMs = CalculateConnectDelayMs(); michael@0: OnConnectError(); michael@0: } michael@0: michael@0: void michael@0: UnixSocketConsumer::NotifyDisconnect() michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: mConnectionStatus = SOCKET_DISCONNECTED; michael@0: mConnectDelayMs = CalculateConnectDelayMs(); michael@0: OnDisconnect(); michael@0: } michael@0: michael@0: bool michael@0: UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector, michael@0: const char* aAddress, michael@0: int aDelayMs) michael@0: { michael@0: MOZ_ASSERT(aConnector); michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: nsAutoPtr connector(aConnector); michael@0: michael@0: if (mImpl) { michael@0: NS_WARNING("Socket already connecting/connected!"); michael@0: return false; michael@0: } michael@0: michael@0: nsCString addr(aAddress); michael@0: MessageLoop* ioLoop = XRE_GetIOMessageLoop(); michael@0: mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr); michael@0: mConnectionStatus = SOCKET_CONNECTING; michael@0: if (aDelayMs > 0) { michael@0: SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl); michael@0: mImpl->SetDelayedConnectTask(connectTask); michael@0: MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs); michael@0: } else { michael@0: ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl)); michael@0: } michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector) michael@0: { michael@0: MOZ_ASSERT(aConnector); michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: nsAutoPtr connector(aConnector); michael@0: michael@0: if (mImpl) { michael@0: NS_WARNING("Socket already connecting/connected!"); michael@0: return false; michael@0: } michael@0: michael@0: mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(), michael@0: EmptyCString()); michael@0: mConnectionStatus = SOCKET_LISTENING; michael@0: XRE_GetIOMessageLoop()->PostTask(FROM_HERE, michael@0: new SocketListenTask(mImpl)); michael@0: return true; michael@0: } michael@0: michael@0: uint32_t michael@0: UnixSocketConsumer::CalculateConnectDelayMs() const michael@0: { michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: uint32_t connectDelayMs = mConnectDelayMs; michael@0: michael@0: if ((PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) { michael@0: // reset delay if connection has been opened for a while, or... michael@0: connectDelayMs = 0; michael@0: } else if (!connectDelayMs) { michael@0: // ...start with a delay of ~1 sec, or... michael@0: connectDelayMs = 1<<10; michael@0: } else if (connectDelayMs < (1<<16)) { michael@0: // ...otherwise increase delay by a factor of 2 michael@0: connectDelayMs <<= 1; michael@0: } michael@0: return connectDelayMs; michael@0: } michael@0: michael@0: } // namespace ipc michael@0: } // namespace mozilla