diff -r 000000000000 -r 6474c204b198 ipc/unixsocket/UnixSocket.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ipc/unixsocket/UnixSocket.cpp Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,921 @@ +/* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "UnixSocket.h" +#include "nsTArray.h" +#include "nsXULAppAPI.h" +#include + +#ifdef MOZ_TASK_TRACER +#include "GeckoTaskTracer.h" +using namespace mozilla::tasktracer; +#endif + +static const size_t MAX_READ_SIZE = 1 << 16; + +namespace mozilla { +namespace ipc { + +class UnixSocketImpl : public UnixSocketWatcher +{ +public: + UnixSocketImpl(MessageLoop* mIOLoop, + UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector, + const nsACString& aAddress) + : UnixSocketWatcher(mIOLoop) + , mConsumer(aConsumer) + , mConnector(aConnector) + , mShuttingDownOnIOThread(false) + , mAddress(aAddress) + , mDelayedConnectTask(nullptr) + { + } + + ~UnixSocketImpl() + { + MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsShutdownOnMainThread()); + } + + void QueueWriteData(UnixSocketRawData* aData) + { + mOutgoingQ.AppendElement(aData); + AddWatchers(WRITE_WATCHER, false); + } + + bool IsShutdownOnMainThread() + { + MOZ_ASSERT(NS_IsMainThread()); + return mConsumer == nullptr; + } + + void ShutdownOnMainThread() + { + MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(!IsShutdownOnMainThread()); + mConsumer = nullptr; + } + + bool IsShutdownOnIOThread() + { + return mShuttingDownOnIOThread; + } + + void ShutdownOnIOThread() + { + MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!mShuttingDownOnIOThread); + + Close(); // will also remove fd from I/O loop + mShuttingDownOnIOThread = true; + } + + void SetDelayedConnectTask(CancelableTask* aTask) + { + MOZ_ASSERT(NS_IsMainThread()); + mDelayedConnectTask = aTask; + } + + void ClearDelayedConnectTask() + { + MOZ_ASSERT(NS_IsMainThread()); + mDelayedConnectTask = nullptr; + } + + void CancelDelayedConnectTask() + { + MOZ_ASSERT(NS_IsMainThread()); + if (!mDelayedConnectTask) { + return; + } + mDelayedConnectTask->Cancel(); + ClearDelayedConnectTask(); + } + + /** + * Connect to a socket + */ + void Connect(); + + /** + * Run bind/listen to prepare for further runs of accept() + */ + void Listen(); + + void GetSocketAddr(nsAString& aAddrStr) + { + if (!mConnector) { + NS_WARNING("No connector to get socket address from!"); + aAddrStr.Truncate(); + return; + } + mConnector->GetSocketAddr(mAddr, aAddrStr); + } + + /** + * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated + * directly from main thread. All non-main-thread accesses should happen with + * mImpl as container. + */ + RefPtr mConsumer; + + void OnAccepted(int aFd, const sockaddr_any* aAddr, + socklen_t aAddrLen) MOZ_OVERRIDE; + void OnConnected() MOZ_OVERRIDE; + void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE; + void OnListening() MOZ_OVERRIDE; + void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE; + void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE; + +private: + // Set up flags on whatever our current file descriptor is. + static bool SetSocketFlags(int aFd); + + void FireSocketError(); + + /** + * Raw data queue. Must be pushed/popped from IO thread only. + */ + typedef nsTArray UnixSocketRawDataQueue; + UnixSocketRawDataQueue mOutgoingQ; + + /** + * Connector object used to create the connection we are currently using. + */ + nsAutoPtr mConnector; + + /** + * If true, do not requeue whatever task we're running + */ + bool mShuttingDownOnIOThread; + + /** + * Address we are connecting to, assuming we are creating a client connection. + */ + nsCString mAddress; + + /** + * Size of the socket address struct + */ + socklen_t mAddrSize; + + /** + * Address struct of the socket currently in use + */ + sockaddr_any mAddr; + + /** + * Task member for delayed connect task. Should only be access on main thread. + */ + CancelableTask* mDelayedConnectTask; +}; + +template +class DeleteInstanceRunnable : public nsRunnable +{ +public: + DeleteInstanceRunnable(T* aInstance) + : mInstance(aInstance) + { } + + NS_IMETHOD Run() + { + delete mInstance; + + return NS_OK; + } + +private: + T* mInstance; +}; + +class UnixSocketImplRunnable : public nsRunnable +{ +public: + UnixSocketImpl* GetImpl() const + { + return mImpl; + } +protected: + UnixSocketImplRunnable(UnixSocketImpl* aImpl) + : mImpl(aImpl) + { + MOZ_ASSERT(aImpl); + } + virtual ~UnixSocketImplRunnable() + { } +private: + UnixSocketImpl* mImpl; +}; + +class OnSocketEventRunnable : public UnixSocketImplRunnable +{ +public: + enum SocketEvent { + CONNECT_SUCCESS, + CONNECT_ERROR, + DISCONNECT + }; + + OnSocketEventRunnable(UnixSocketImpl* aImpl, SocketEvent e) + : UnixSocketImplRunnable(aImpl) + , mEvent(e) + { + MOZ_ASSERT(!NS_IsMainThread()); + } + + NS_IMETHOD Run() MOZ_OVERRIDE + { + MOZ_ASSERT(NS_IsMainThread()); + + UnixSocketImpl* impl = GetImpl(); + + if (impl->IsShutdownOnMainThread()) { + NS_WARNING("CloseSocket has already been called!"); + // Since we've already explicitly closed and the close happened before + // this, this isn't really an error. Since we've warned, return OK. + return NS_OK; + } + if (mEvent == CONNECT_SUCCESS) { + impl->mConsumer->NotifySuccess(); + } else if (mEvent == CONNECT_ERROR) { + impl->mConsumer->NotifyError(); + } else if (mEvent == DISCONNECT) { + impl->mConsumer->NotifyDisconnect(); + } + return NS_OK; + } +private: + SocketEvent mEvent; +}; + +class SocketReceiveRunnable : public UnixSocketImplRunnable +{ +public: + SocketReceiveRunnable(UnixSocketImpl* aImpl, UnixSocketRawData* aData) + : UnixSocketImplRunnable(aImpl) + , mRawData(aData) + { + MOZ_ASSERT(aData); + } + + NS_IMETHOD Run() MOZ_OVERRIDE + { + MOZ_ASSERT(NS_IsMainThread()); + + UnixSocketImpl* impl = GetImpl(); + + if (impl->IsShutdownOnMainThread()) { + NS_WARNING("mConsumer is null, aborting receive!"); + // Since we've already explicitly closed and the close happened before + // this, this isn't really an error. Since we've warned, return OK. + return NS_OK; + } + + MOZ_ASSERT(impl->mConsumer); + impl->mConsumer->ReceiveSocketData(mRawData); + return NS_OK; + } +private: + nsAutoPtr mRawData; +}; + +class RequestClosingSocketRunnable : public UnixSocketImplRunnable +{ +public: + RequestClosingSocketRunnable(UnixSocketImpl* aImpl) + : UnixSocketImplRunnable(aImpl) + { } + + NS_IMETHOD Run() MOZ_OVERRIDE + { + MOZ_ASSERT(NS_IsMainThread()); + + UnixSocketImpl* impl = GetImpl(); + if (impl->IsShutdownOnMainThread()) { + NS_WARNING("CloseSocket has already been called!"); + // Since we've already explicitly closed and the close happened before + // this, this isn't really an error. Since we've warned, return OK. + return NS_OK; + } + + // Start from here, same handling flow as calling CloseSocket() from + // upper layer + impl->mConsumer->CloseSocket(); + return NS_OK; + } +}; + +class UnixSocketImplTask : public CancelableTask +{ +public: + UnixSocketImpl* GetImpl() const + { + return mImpl; + } + void Cancel() MOZ_OVERRIDE + { + mImpl = nullptr; + } + bool IsCanceled() const + { + return !mImpl; + } +protected: + UnixSocketImplTask(UnixSocketImpl* aImpl) + : mImpl(aImpl) + { + MOZ_ASSERT(mImpl); + } +private: + UnixSocketImpl* mImpl; +}; + +class SocketSendTask : public UnixSocketImplTask +{ +public: + SocketSendTask(UnixSocketImpl* aImpl, + UnixSocketConsumer* aConsumer, + UnixSocketRawData* aData) + : UnixSocketImplTask(aImpl) + , mConsumer(aConsumer) + , mData(aData) + { + MOZ_ASSERT(aConsumer); + MOZ_ASSERT(aData); + } + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsCanceled()); + + UnixSocketImpl* impl = GetImpl(); + MOZ_ASSERT(!impl->IsShutdownOnIOThread()); + + impl->QueueWriteData(mData); + } +private: + nsRefPtr mConsumer; + UnixSocketRawData* mData; +}; + +class SocketListenTask : public UnixSocketImplTask +{ +public: + SocketListenTask(UnixSocketImpl* aImpl) + : UnixSocketImplTask(aImpl) + { } + + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(!NS_IsMainThread()); + if (!IsCanceled()) { + GetImpl()->Listen(); + } + } +}; + +class SocketConnectTask : public UnixSocketImplTask +{ +public: + SocketConnectTask(UnixSocketImpl* aImpl) + : UnixSocketImplTask(aImpl) + { } + + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsCanceled()); + GetImpl()->Connect(); + } +}; + +class SocketDelayedConnectTask : public UnixSocketImplTask +{ +public: + SocketDelayedConnectTask(UnixSocketImpl* aImpl) + : UnixSocketImplTask(aImpl) + { } + + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(NS_IsMainThread()); + if (IsCanceled()) { + return; + } + UnixSocketImpl* impl = GetImpl(); + if (impl->IsShutdownOnMainThread()) { + return; + } + impl->ClearDelayedConnectTask(); + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl)); + } +}; + +class ShutdownSocketTask : public UnixSocketImplTask +{ +public: + ShutdownSocketTask(UnixSocketImpl* aImpl) + : UnixSocketImplTask(aImpl) + { } + + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsCanceled()); + + UnixSocketImpl* impl = GetImpl(); + + // At this point, there should be no new events on the IO thread after this + // one with the possible exception of a SocketListenTask that + // ShutdownOnIOThread will cancel for us. We are now fully shut down, so we + // can send a message to the main thread that will delete impl safely knowing + // that no more tasks reference it. + impl->ShutdownOnIOThread(); + + nsRefPtr r(new DeleteInstanceRunnable(impl)); + nsresult rv = NS_DispatchToMainThread(r); + NS_ENSURE_SUCCESS_VOID(rv); + } +}; + +void +UnixSocketImpl::FireSocketError() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + // Clean up watchers, statuses, fds + Close(); + + // Tell the main thread we've errored + nsRefPtr r = + new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_ERROR); + NS_DispatchToMainThread(r); +} + +void +UnixSocketImpl::Listen() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(mConnector); + + // This will set things we don't particularly care about, but it will hand + // back the correct structure size which is what we do care about. + if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) { + NS_WARNING("Cannot create socket address!"); + FireSocketError(); + return; + } + + if (!IsOpen()) { + int fd = mConnector->Create(); + if (fd < 0) { + NS_WARNING("Cannot create socket fd!"); + FireSocketError(); + return; + } + if (!SetSocketFlags(fd)) { + NS_WARNING("Cannot set socket flags!"); + FireSocketError(); + return; + } + SetFd(fd); + + // calls OnListening on success, or OnError otherwise + nsresult rv = UnixSocketWatcher::Listen( + reinterpret_cast(&mAddr), mAddrSize); + NS_WARN_IF(NS_FAILED(rv)); + } +} + +void +UnixSocketImpl::Connect() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(mConnector); + + if (!IsOpen()) { + int fd = mConnector->Create(); + if (fd < 0) { + NS_WARNING("Cannot create socket fd!"); + FireSocketError(); + return; + } + if (!SetSocketFlags(fd)) { + NS_WARNING("Cannot set socket flags!"); + FireSocketError(); + return; + } + SetFd(fd); + } + + if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) { + NS_WARNING("Cannot create socket address!"); + FireSocketError(); + return; + } + + // calls OnConnected() on success, or OnError() otherwise + nsresult rv = UnixSocketWatcher::Connect( + reinterpret_cast(&mAddr), mAddrSize); + NS_WARN_IF(NS_FAILED(rv)); +} + +bool +UnixSocketImpl::SetSocketFlags(int aFd) +{ + // Set socket addr to be reused even if kernel is still waiting to close + int n = 1; + if (setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n)) < 0) { + return false; + } + + // Set close-on-exec bit. + int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD)); + if (-1 == flags) { + return false; + } + flags |= FD_CLOEXEC; + if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) { + return false; + } + + // Set non-blocking status flag. + flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL)); + if (-1 == flags) { + return false; + } + flags |= O_NONBLOCK; + if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) { + return false; + } + + return true; +} + +void +UnixSocketImpl::OnAccepted(int aFd, + const sockaddr_any* aAddr, + socklen_t aAddrLen) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); + MOZ_ASSERT(aAddr); + MOZ_ASSERT(aAddrLen <= sizeof(mAddr)); + + memcpy (&mAddr, aAddr, aAddrLen); + mAddrSize = aAddrLen; + + if (!mConnector->SetUp(aFd)) { + NS_WARNING("Could not set up socket!"); + return; + } + + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + Close(); + if (!SetSocketFlags(aFd)) { + return; + } + SetSocket(aFd, SOCKET_IS_CONNECTED); + + nsRefPtr r = + new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS); + NS_DispatchToMainThread(r); + + AddWatchers(READ_WATCHER, true); + if (!mOutgoingQ.IsEmpty()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +UnixSocketImpl::OnConnected() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); + + if (!SetSocketFlags(GetFd())) { + NS_WARNING("Cannot set socket flags!"); + FireSocketError(); + return; + } + + if (!mConnector->SetUp(GetFd())) { + NS_WARNING("Could not set up socket!"); + FireSocketError(); + return; + } + + nsRefPtr r = + new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS); + NS_DispatchToMainThread(r); + + AddWatchers(READ_WATCHER, true); + if (!mOutgoingQ.IsEmpty()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +UnixSocketImpl::OnListening() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); + + if (!mConnector->SetUpListenSocket(GetFd())) { + NS_WARNING("Could not set up listen socket!"); + FireSocketError(); + return; + } + + AddWatchers(READ_WATCHER, true); +} + +void +UnixSocketImpl::OnError(const char* aFunction, int aErrno) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + UnixFdWatcher::OnError(aFunction, aErrno); + FireSocketError(); +} + +void +UnixSocketImpl::OnSocketCanReceiveWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 + + // Read all of the incoming data. + while (true) { + nsAutoPtr incoming(new UnixSocketRawData(MAX_READ_SIZE)); + + ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize); + if (ret <= 0) { + if (ret == -1) { + if (errno == EINTR) { + continue; // retry system call when interrupted + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; // no data available: return and re-poll + } + +#ifdef DEBUG + NS_WARNING("Cannot read from network"); +#endif + // else fall through to error handling on other errno's + } + + // We're done with our descriptors. Ensure that spurious events don't + // cause us to end up back here. + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + nsRefPtr r = + new RequestClosingSocketRunnable(this); + NS_DispatchToMainThread(r); + return; + } + +#ifdef MOZ_TASK_TRACER + // Make unix socket creation events to be the source events of TaskTracer, + // and originate the rest correlation tasks from here. + AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET); +#endif + + incoming->mSize = ret; + nsRefPtr r = + new SocketReceiveRunnable(this, incoming.forget()); + NS_DispatchToMainThread(r); + + // If ret is less than MAX_READ_SIZE, there's no + // more data in the socket for us to read now. + if (ret < ssize_t(MAX_READ_SIZE)) { + return; + } + } +} + +void +UnixSocketImpl::OnSocketCanSendWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 + + // Try to write the bytes of mCurrentRilRawData. If all were written, continue. + // + // Otherwise, save the byte position of the next byte to write + // within mCurrentWriteOffset, and request another write when the + // system won't block. + // + while (true) { + UnixSocketRawData* data; + if (mOutgoingQ.IsEmpty()) { + return; + } + data = mOutgoingQ.ElementAt(0); + const uint8_t *toWrite; + toWrite = data->mData; + + while (data->mCurrentWriteOffset < data->mSize) { + ssize_t write_amount = data->mSize - data->mCurrentWriteOffset; + ssize_t written; + written = write (GetFd(), toWrite + data->mCurrentWriteOffset, + write_amount); + if (written > 0) { + data->mCurrentWriteOffset += written; + } + if (written != write_amount) { + break; + } + } + + if (data->mCurrentWriteOffset != data->mSize) { + AddWatchers(WRITE_WATCHER, false); + return; + } + mOutgoingQ.RemoveElementAt(0); + delete data; + } +} + +UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr) + , mConnectionStatus(SOCKET_DISCONNECTED) + , mConnectTimestamp(0) + , mConnectDelayMs(0) +{ +} + +UnixSocketConsumer::~UnixSocketConsumer() +{ + MOZ_ASSERT(mConnectionStatus == SOCKET_DISCONNECTED); + MOZ_ASSERT(!mImpl); +} + +bool +UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData) +{ + MOZ_ASSERT(NS_IsMainThread()); + if (!mImpl) { + return false; + } + + MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, + new SocketSendTask(mImpl, this, aData)); + return true; +} + +bool +UnixSocketConsumer::SendSocketData(const nsACString& aStr) +{ + MOZ_ASSERT(NS_IsMainThread()); + if (!mImpl) { + return false; + } + if (aStr.Length() > MAX_READ_SIZE) { + return false; + } + + MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); + UnixSocketRawData* d = new UnixSocketRawData(aStr.BeginReading(), + aStr.Length()); + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, + new SocketSendTask(mImpl, this, d)); + return true; +} + +void +UnixSocketConsumer::CloseSocket() +{ + MOZ_ASSERT(NS_IsMainThread()); + if (!mImpl) { + return; + } + + mImpl->CancelDelayedConnectTask(); + + // From this point on, we consider mImpl as being deleted. + // We sever the relationship here so any future calls to listen or connect + // will create a new implementation. + mImpl->ShutdownOnMainThread(); + + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, + new ShutdownSocketTask(mImpl)); + + mImpl = nullptr; + + NotifyDisconnect(); +} + +void +UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr) +{ + aAddrStr.Truncate(); + if (!mImpl || mConnectionStatus != SOCKET_CONNECTED) { + NS_WARNING("No socket currently open!"); + return; + } + mImpl->GetSocketAddr(aAddrStr); +} + +void +UnixSocketConsumer::NotifySuccess() +{ + MOZ_ASSERT(NS_IsMainThread()); + mConnectionStatus = SOCKET_CONNECTED; + mConnectTimestamp = PR_IntervalNow(); + OnConnectSuccess(); +} + +void +UnixSocketConsumer::NotifyError() +{ + MOZ_ASSERT(NS_IsMainThread()); + mConnectionStatus = SOCKET_DISCONNECTED; + mConnectDelayMs = CalculateConnectDelayMs(); + OnConnectError(); +} + +void +UnixSocketConsumer::NotifyDisconnect() +{ + MOZ_ASSERT(NS_IsMainThread()); + mConnectionStatus = SOCKET_DISCONNECTED; + mConnectDelayMs = CalculateConnectDelayMs(); + OnDisconnect(); +} + +bool +UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector, + const char* aAddress, + int aDelayMs) +{ + MOZ_ASSERT(aConnector); + MOZ_ASSERT(NS_IsMainThread()); + + nsAutoPtr connector(aConnector); + + if (mImpl) { + NS_WARNING("Socket already connecting/connected!"); + return false; + } + + nsCString addr(aAddress); + MessageLoop* ioLoop = XRE_GetIOMessageLoop(); + mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr); + mConnectionStatus = SOCKET_CONNECTING; + if (aDelayMs > 0) { + SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl); + mImpl->SetDelayedConnectTask(connectTask); + MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs); + } else { + ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl)); + } + return true; +} + +bool +UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector) +{ + MOZ_ASSERT(aConnector); + MOZ_ASSERT(NS_IsMainThread()); + + nsAutoPtr connector(aConnector); + + if (mImpl) { + NS_WARNING("Socket already connecting/connected!"); + return false; + } + + mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(), + EmptyCString()); + mConnectionStatus = SOCKET_LISTENING; + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, + new SocketListenTask(mImpl)); + return true; +} + +uint32_t +UnixSocketConsumer::CalculateConnectDelayMs() const +{ + MOZ_ASSERT(NS_IsMainThread()); + + uint32_t connectDelayMs = mConnectDelayMs; + + if ((PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) { + // reset delay if connection has been opened for a while, or... + connectDelayMs = 0; + } else if (!connectDelayMs) { + // ...start with a delay of ~1 sec, or... + connectDelayMs = 1<<10; + } else if (connectDelayMs < (1<<16)) { + // ...otherwise increase delay by a factor of 2 + connectDelayMs <<= 1; + } + return connectDelayMs; +} + +} // namespace ipc +} // namespace mozilla