ipc/unixsocket/UnixSocket.cpp

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* -*- Mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; tab-width: 40 -*- */
     2 /* vim: set ts=2 et sw=2 tw=80: */
     3 /* This Source Code Form is subject to the terms of the Mozilla Public
     4  * License, v. 2.0. If a copy of the MPL was not distributed with this
     5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     7 #include "UnixSocket.h"
     8 #include "nsTArray.h"
     9 #include "nsXULAppAPI.h"
    10 #include <fcntl.h>
    12 #ifdef MOZ_TASK_TRACER
    13 #include "GeckoTaskTracer.h"
    14 using namespace mozilla::tasktracer;
    15 #endif
    17 static const size_t MAX_READ_SIZE = 1 << 16;
    19 namespace mozilla {
    20 namespace ipc {
    22 class UnixSocketImpl : public UnixSocketWatcher
    23 {
    24 public:
    25   UnixSocketImpl(MessageLoop* mIOLoop,
    26                  UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
    27                  const nsACString& aAddress)
    28     : UnixSocketWatcher(mIOLoop)
    29     , mConsumer(aConsumer)
    30     , mConnector(aConnector)
    31     , mShuttingDownOnIOThread(false)
    32     , mAddress(aAddress)
    33     , mDelayedConnectTask(nullptr)
    34   {
    35   }
    37   ~UnixSocketImpl()
    38   {
    39     MOZ_ASSERT(NS_IsMainThread());
    40     MOZ_ASSERT(IsShutdownOnMainThread());
    41   }
    43   void QueueWriteData(UnixSocketRawData* aData)
    44   {
    45     mOutgoingQ.AppendElement(aData);
    46     AddWatchers(WRITE_WATCHER, false);
    47   }
    49   bool IsShutdownOnMainThread()
    50   {
    51     MOZ_ASSERT(NS_IsMainThread());
    52     return mConsumer == nullptr;
    53   }
    55   void ShutdownOnMainThread()
    56   {
    57     MOZ_ASSERT(NS_IsMainThread());
    58     MOZ_ASSERT(!IsShutdownOnMainThread());
    59     mConsumer = nullptr;
    60   }
    62   bool IsShutdownOnIOThread()
    63   {
    64     return mShuttingDownOnIOThread;
    65   }
    67   void ShutdownOnIOThread()
    68   {
    69     MOZ_ASSERT(!NS_IsMainThread());
    70     MOZ_ASSERT(!mShuttingDownOnIOThread);
    72     Close(); // will also remove fd from I/O loop
    73     mShuttingDownOnIOThread = true;
    74   }
    76   void SetDelayedConnectTask(CancelableTask* aTask)
    77   {
    78     MOZ_ASSERT(NS_IsMainThread());
    79     mDelayedConnectTask = aTask;
    80   }
    82   void ClearDelayedConnectTask()
    83   {
    84     MOZ_ASSERT(NS_IsMainThread());
    85     mDelayedConnectTask = nullptr;
    86   }
    88   void CancelDelayedConnectTask()
    89   {
    90     MOZ_ASSERT(NS_IsMainThread());
    91     if (!mDelayedConnectTask) {
    92       return;
    93     }
    94     mDelayedConnectTask->Cancel();
    95     ClearDelayedConnectTask();
    96   }
    98   /**
    99    * Connect to a socket
   100    */
   101   void Connect();
   103   /**
   104    * Run bind/listen to prepare for further runs of accept()
   105    */
   106   void Listen();
   108   void GetSocketAddr(nsAString& aAddrStr)
   109   {
   110     if (!mConnector) {
   111       NS_WARNING("No connector to get socket address from!");
   112       aAddrStr.Truncate();
   113       return;
   114     }
   115     mConnector->GetSocketAddr(mAddr, aAddrStr);
   116   }
   118   /**
   119    * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
   120    * directly from main thread. All non-main-thread accesses should happen with
   121    * mImpl as container.
   122    */
   123   RefPtr<UnixSocketConsumer> mConsumer;
   125   void OnAccepted(int aFd, const sockaddr_any* aAddr,
   126                   socklen_t aAddrLen) MOZ_OVERRIDE;
   127   void OnConnected() MOZ_OVERRIDE;
   128   void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE;
   129   void OnListening() MOZ_OVERRIDE;
   130   void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE;
   131   void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE;
   133 private:
   134   // Set up flags on whatever our current file descriptor is.
   135   static bool SetSocketFlags(int aFd);
   137   void FireSocketError();
   139   /**
   140    * Raw data queue. Must be pushed/popped from IO thread only.
   141    */
   142   typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
   143   UnixSocketRawDataQueue mOutgoingQ;
   145   /**
   146    * Connector object used to create the connection we are currently using.
   147    */
   148   nsAutoPtr<UnixSocketConnector> mConnector;
   150   /**
   151    * If true, do not requeue whatever task we're running
   152    */
   153   bool mShuttingDownOnIOThread;
   155   /**
   156    * Address we are connecting to, assuming we are creating a client connection.
   157    */
   158   nsCString mAddress;
   160   /**
   161    * Size of the socket address struct
   162    */
   163   socklen_t mAddrSize;
   165   /**
   166    * Address struct of the socket currently in use
   167    */
   168   sockaddr_any mAddr;
   170   /**
   171    * Task member for delayed connect task. Should only be access on main thread.
   172    */
   173   CancelableTask* mDelayedConnectTask;
   174 };
   176 template<class T>
   177 class DeleteInstanceRunnable : public nsRunnable
   178 {
   179 public:
   180   DeleteInstanceRunnable(T* aInstance)
   181   : mInstance(aInstance)
   182   { }
   184   NS_IMETHOD Run()
   185   {
   186     delete mInstance;
   188     return NS_OK;
   189   }
   191 private:
   192   T* mInstance;
   193 };
   195 class UnixSocketImplRunnable : public nsRunnable
   196 {
   197 public:
   198   UnixSocketImpl* GetImpl() const
   199   {
   200     return mImpl;
   201   }
   202 protected:
   203   UnixSocketImplRunnable(UnixSocketImpl* aImpl)
   204   : mImpl(aImpl)
   205   {
   206     MOZ_ASSERT(aImpl);
   207   }
   208   virtual ~UnixSocketImplRunnable()
   209   { }
   210 private:
   211   UnixSocketImpl* mImpl;
   212 };
   214 class OnSocketEventRunnable : public UnixSocketImplRunnable
   215 {
   216 public:
   217   enum SocketEvent {
   218     CONNECT_SUCCESS,
   219     CONNECT_ERROR,
   220     DISCONNECT
   221   };
   223   OnSocketEventRunnable(UnixSocketImpl* aImpl, SocketEvent e)
   224   : UnixSocketImplRunnable(aImpl)
   225   , mEvent(e)
   226   {
   227     MOZ_ASSERT(!NS_IsMainThread());
   228   }
   230   NS_IMETHOD Run() MOZ_OVERRIDE
   231   {
   232     MOZ_ASSERT(NS_IsMainThread());
   234     UnixSocketImpl* impl = GetImpl();
   236     if (impl->IsShutdownOnMainThread()) {
   237       NS_WARNING("CloseSocket has already been called!");
   238       // Since we've already explicitly closed and the close happened before
   239       // this, this isn't really an error. Since we've warned, return OK.
   240       return NS_OK;
   241     }
   242     if (mEvent == CONNECT_SUCCESS) {
   243       impl->mConsumer->NotifySuccess();
   244     } else if (mEvent == CONNECT_ERROR) {
   245       impl->mConsumer->NotifyError();
   246     } else if (mEvent == DISCONNECT) {
   247       impl->mConsumer->NotifyDisconnect();
   248     }
   249     return NS_OK;
   250   }
   251 private:
   252   SocketEvent mEvent;
   253 };
   255 class SocketReceiveRunnable : public UnixSocketImplRunnable
   256 {
   257 public:
   258   SocketReceiveRunnable(UnixSocketImpl* aImpl, UnixSocketRawData* aData)
   259   : UnixSocketImplRunnable(aImpl)
   260   , mRawData(aData)
   261   {
   262     MOZ_ASSERT(aData);
   263   }
   265   NS_IMETHOD Run() MOZ_OVERRIDE
   266   {
   267     MOZ_ASSERT(NS_IsMainThread());
   269     UnixSocketImpl* impl = GetImpl();
   271     if (impl->IsShutdownOnMainThread()) {
   272       NS_WARNING("mConsumer is null, aborting receive!");
   273       // Since we've already explicitly closed and the close happened before
   274       // this, this isn't really an error. Since we've warned, return OK.
   275       return NS_OK;
   276     }
   278     MOZ_ASSERT(impl->mConsumer);
   279     impl->mConsumer->ReceiveSocketData(mRawData);
   280     return NS_OK;
   281   }
   282 private:
   283   nsAutoPtr<UnixSocketRawData> mRawData;
   284 };
   286 class RequestClosingSocketRunnable : public UnixSocketImplRunnable
   287 {
   288 public:
   289   RequestClosingSocketRunnable(UnixSocketImpl* aImpl)
   290   : UnixSocketImplRunnable(aImpl)
   291   { }
   293   NS_IMETHOD Run() MOZ_OVERRIDE
   294   {
   295     MOZ_ASSERT(NS_IsMainThread());
   297     UnixSocketImpl* impl = GetImpl();
   298     if (impl->IsShutdownOnMainThread()) {
   299       NS_WARNING("CloseSocket has already been called!");
   300       // Since we've already explicitly closed and the close happened before
   301       // this, this isn't really an error. Since we've warned, return OK.
   302       return NS_OK;
   303     }
   305     // Start from here, same handling flow as calling CloseSocket() from
   306     // upper layer
   307     impl->mConsumer->CloseSocket();
   308     return NS_OK;
   309   }
   310 };
   312 class UnixSocketImplTask : public CancelableTask
   313 {
   314 public:
   315   UnixSocketImpl* GetImpl() const
   316   {
   317     return mImpl;
   318   }
   319   void Cancel() MOZ_OVERRIDE
   320   {
   321     mImpl = nullptr;
   322   }
   323   bool IsCanceled() const
   324   {
   325     return !mImpl;
   326   }
   327 protected:
   328   UnixSocketImplTask(UnixSocketImpl* aImpl)
   329   : mImpl(aImpl)
   330   {
   331     MOZ_ASSERT(mImpl);
   332   }
   333 private:
   334   UnixSocketImpl* mImpl;
   335 };
   337 class SocketSendTask : public UnixSocketImplTask
   338 {
   339 public:
   340   SocketSendTask(UnixSocketImpl* aImpl,
   341                  UnixSocketConsumer* aConsumer,
   342                  UnixSocketRawData* aData)
   343   : UnixSocketImplTask(aImpl)
   344   , mConsumer(aConsumer)
   345   , mData(aData)
   346   {
   347     MOZ_ASSERT(aConsumer);
   348     MOZ_ASSERT(aData);
   349   }
   350   void Run() MOZ_OVERRIDE
   351   {
   352     MOZ_ASSERT(!NS_IsMainThread());
   353     MOZ_ASSERT(!IsCanceled());
   355     UnixSocketImpl* impl = GetImpl();
   356     MOZ_ASSERT(!impl->IsShutdownOnIOThread());
   358     impl->QueueWriteData(mData);
   359   }
   360 private:
   361   nsRefPtr<UnixSocketConsumer> mConsumer;
   362   UnixSocketRawData* mData;
   363 };
   365 class SocketListenTask : public UnixSocketImplTask
   366 {
   367 public:
   368   SocketListenTask(UnixSocketImpl* aImpl)
   369   : UnixSocketImplTask(aImpl)
   370   { }
   372   void Run() MOZ_OVERRIDE
   373   {
   374     MOZ_ASSERT(!NS_IsMainThread());
   375     if (!IsCanceled()) {
   376       GetImpl()->Listen();
   377     }
   378   }
   379 };
   381 class SocketConnectTask : public UnixSocketImplTask
   382 {
   383 public:
   384   SocketConnectTask(UnixSocketImpl* aImpl)
   385   : UnixSocketImplTask(aImpl)
   386   { }
   388   void Run() MOZ_OVERRIDE
   389   {
   390     MOZ_ASSERT(!NS_IsMainThread());
   391     MOZ_ASSERT(!IsCanceled());
   392     GetImpl()->Connect();
   393   }
   394 };
   396 class SocketDelayedConnectTask : public UnixSocketImplTask
   397 {
   398 public:
   399   SocketDelayedConnectTask(UnixSocketImpl* aImpl)
   400   : UnixSocketImplTask(aImpl)
   401   { }
   403   void Run() MOZ_OVERRIDE
   404   {
   405     MOZ_ASSERT(NS_IsMainThread());
   406     if (IsCanceled()) {
   407       return;
   408     }
   409     UnixSocketImpl* impl = GetImpl();
   410     if (impl->IsShutdownOnMainThread()) {
   411       return;
   412     }
   413     impl->ClearDelayedConnectTask();
   414     XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl));
   415   }
   416 };
   418 class ShutdownSocketTask : public UnixSocketImplTask
   419 {
   420 public:
   421   ShutdownSocketTask(UnixSocketImpl* aImpl)
   422   : UnixSocketImplTask(aImpl)
   423   { }
   425   void Run() MOZ_OVERRIDE
   426   {
   427     MOZ_ASSERT(!NS_IsMainThread());
   428     MOZ_ASSERT(!IsCanceled());
   430     UnixSocketImpl* impl = GetImpl();
   432     // At this point, there should be no new events on the IO thread after this
   433     // one with the possible exception of a SocketListenTask that
   434     // ShutdownOnIOThread will cancel for us. We are now fully shut down, so we
   435     // can send a message to the main thread that will delete impl safely knowing
   436     // that no more tasks reference it.
   437     impl->ShutdownOnIOThread();
   439     nsRefPtr<nsIRunnable> r(new DeleteInstanceRunnable<UnixSocketImpl>(impl));
   440     nsresult rv = NS_DispatchToMainThread(r);
   441     NS_ENSURE_SUCCESS_VOID(rv);
   442   }
   443 };
   445 void
   446 UnixSocketImpl::FireSocketError()
   447 {
   448   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   450   // Clean up watchers, statuses, fds
   451   Close();
   453   // Tell the main thread we've errored
   454   nsRefPtr<OnSocketEventRunnable> r =
   455     new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_ERROR);
   456   NS_DispatchToMainThread(r);
   457 }
   459 void
   460 UnixSocketImpl::Listen()
   461 {
   462   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   463   MOZ_ASSERT(mConnector);
   465   // This will set things we don't particularly care about, but it will hand
   466   // back the correct structure size which is what we do care about.
   467   if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) {
   468     NS_WARNING("Cannot create socket address!");
   469     FireSocketError();
   470     return;
   471   }
   473   if (!IsOpen()) {
   474     int fd = mConnector->Create();
   475     if (fd < 0) {
   476       NS_WARNING("Cannot create socket fd!");
   477       FireSocketError();
   478       return;
   479     }
   480     if (!SetSocketFlags(fd)) {
   481       NS_WARNING("Cannot set socket flags!");
   482       FireSocketError();
   483       return;
   484     }
   485     SetFd(fd);
   487     // calls OnListening on success, or OnError otherwise
   488     nsresult rv = UnixSocketWatcher::Listen(
   489       reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
   490     NS_WARN_IF(NS_FAILED(rv));
   491   }
   492 }
   494 void
   495 UnixSocketImpl::Connect()
   496 {
   497   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   498   MOZ_ASSERT(mConnector);
   500   if (!IsOpen()) {
   501     int fd = mConnector->Create();
   502     if (fd < 0) {
   503       NS_WARNING("Cannot create socket fd!");
   504       FireSocketError();
   505       return;
   506     }
   507     if (!SetSocketFlags(fd)) {
   508       NS_WARNING("Cannot set socket flags!");
   509       FireSocketError();
   510       return;
   511     }
   512     SetFd(fd);
   513   }
   515   if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) {
   516     NS_WARNING("Cannot create socket address!");
   517     FireSocketError();
   518     return;
   519   }
   521   // calls OnConnected() on success, or OnError() otherwise
   522   nsresult rv = UnixSocketWatcher::Connect(
   523     reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
   524   NS_WARN_IF(NS_FAILED(rv));
   525 }
   527 bool
   528 UnixSocketImpl::SetSocketFlags(int aFd)
   529 {
   530   // Set socket addr to be reused even if kernel is still waiting to close
   531   int n = 1;
   532   if (setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n)) < 0) {
   533     return false;
   534   }
   536   // Set close-on-exec bit.
   537   int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD));
   538   if (-1 == flags) {
   539     return false;
   540   }
   541   flags |= FD_CLOEXEC;
   542   if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) {
   543     return false;
   544   }
   546   // Set non-blocking status flag.
   547   flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL));
   548   if (-1 == flags) {
   549     return false;
   550   }
   551   flags |= O_NONBLOCK;
   552   if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) {
   553     return false;
   554   }
   556   return true;
   557 }
   559 void
   560 UnixSocketImpl::OnAccepted(int aFd,
   561                            const sockaddr_any* aAddr,
   562                            socklen_t aAddrLen)
   563 {
   564   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   565   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
   566   MOZ_ASSERT(aAddr);
   567   MOZ_ASSERT(aAddrLen <= sizeof(mAddr));
   569   memcpy (&mAddr, aAddr, aAddrLen);
   570   mAddrSize = aAddrLen;
   572   if (!mConnector->SetUp(aFd)) {
   573     NS_WARNING("Could not set up socket!");
   574     return;
   575   }
   577   RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
   578   Close();
   579   if (!SetSocketFlags(aFd)) {
   580     return;
   581   }
   582   SetSocket(aFd, SOCKET_IS_CONNECTED);
   584   nsRefPtr<OnSocketEventRunnable> r =
   585     new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS);
   586   NS_DispatchToMainThread(r);
   588   AddWatchers(READ_WATCHER, true);
   589   if (!mOutgoingQ.IsEmpty()) {
   590     AddWatchers(WRITE_WATCHER, false);
   591   }
   592 }
   594 void
   595 UnixSocketImpl::OnConnected()
   596 {
   597   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   598   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
   600   if (!SetSocketFlags(GetFd())) {
   601     NS_WARNING("Cannot set socket flags!");
   602     FireSocketError();
   603     return;
   604   }
   606   if (!mConnector->SetUp(GetFd())) {
   607     NS_WARNING("Could not set up socket!");
   608     FireSocketError();
   609     return;
   610   }
   612   nsRefPtr<OnSocketEventRunnable> r =
   613     new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS);
   614   NS_DispatchToMainThread(r);
   616   AddWatchers(READ_WATCHER, true);
   617   if (!mOutgoingQ.IsEmpty()) {
   618     AddWatchers(WRITE_WATCHER, false);
   619   }
   620 }
   622 void
   623 UnixSocketImpl::OnListening()
   624 {
   625   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   626   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
   628   if (!mConnector->SetUpListenSocket(GetFd())) {
   629     NS_WARNING("Could not set up listen socket!");
   630     FireSocketError();
   631     return;
   632   }
   634   AddWatchers(READ_WATCHER, true);
   635 }
   637 void
   638 UnixSocketImpl::OnError(const char* aFunction, int aErrno)
   639 {
   640   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   642   UnixFdWatcher::OnError(aFunction, aErrno);
   643   FireSocketError();
   644 }
   646 void
   647 UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
   648 {
   649   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   650   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
   652   // Read all of the incoming data.
   653   while (true) {
   654     nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
   656     ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize);
   657     if (ret <= 0) {
   658       if (ret == -1) {
   659         if (errno == EINTR) {
   660           continue; // retry system call when interrupted
   661         }
   662         if (errno == EAGAIN || errno == EWOULDBLOCK) {
   663           return; // no data available: return and re-poll
   664         }
   666 #ifdef DEBUG
   667         NS_WARNING("Cannot read from network");
   668 #endif
   669         // else fall through to error handling on other errno's
   670       }
   672       // We're done with our descriptors. Ensure that spurious events don't
   673       // cause us to end up back here.
   674       RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
   675       nsRefPtr<RequestClosingSocketRunnable> r =
   676         new RequestClosingSocketRunnable(this);
   677       NS_DispatchToMainThread(r);
   678       return;
   679     }
   681 #ifdef MOZ_TASK_TRACER
   682     // Make unix socket creation events to be the source events of TaskTracer,
   683     // and originate the rest correlation tasks from here.
   684     AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET);
   685 #endif
   687     incoming->mSize = ret;
   688     nsRefPtr<SocketReceiveRunnable> r =
   689       new SocketReceiveRunnable(this, incoming.forget());
   690     NS_DispatchToMainThread(r);
   692     // If ret is less than MAX_READ_SIZE, there's no
   693     // more data in the socket for us to read now.
   694     if (ret < ssize_t(MAX_READ_SIZE)) {
   695       return;
   696     }
   697   }
   698 }
   700 void
   701 UnixSocketImpl::OnSocketCanSendWithoutBlocking()
   702 {
   703   MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
   704   MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
   706   // Try to write the bytes of mCurrentRilRawData.  If all were written, continue.
   707   //
   708   // Otherwise, save the byte position of the next byte to write
   709   // within mCurrentWriteOffset, and request another write when the
   710   // system won't block.
   711   //
   712   while (true) {
   713     UnixSocketRawData* data;
   714     if (mOutgoingQ.IsEmpty()) {
   715       return;
   716     }
   717     data = mOutgoingQ.ElementAt(0);
   718     const uint8_t *toWrite;
   719     toWrite = data->mData;
   721     while (data->mCurrentWriteOffset < data->mSize) {
   722       ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
   723       ssize_t written;
   724       written = write (GetFd(), toWrite + data->mCurrentWriteOffset,
   725                          write_amount);
   726       if (written > 0) {
   727         data->mCurrentWriteOffset += written;
   728       }
   729       if (written != write_amount) {
   730         break;
   731       }
   732     }
   734     if (data->mCurrentWriteOffset != data->mSize) {
   735       AddWatchers(WRITE_WATCHER, false);
   736       return;
   737     }
   738     mOutgoingQ.RemoveElementAt(0);
   739     delete data;
   740   }
   741 }
   743 UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
   744                                          , mConnectionStatus(SOCKET_DISCONNECTED)
   745                                          , mConnectTimestamp(0)
   746                                          , mConnectDelayMs(0)
   747 {
   748 }
   750 UnixSocketConsumer::~UnixSocketConsumer()
   751 {
   752   MOZ_ASSERT(mConnectionStatus == SOCKET_DISCONNECTED);
   753   MOZ_ASSERT(!mImpl);
   754 }
   756 bool
   757 UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
   758 {
   759   MOZ_ASSERT(NS_IsMainThread());
   760   if (!mImpl) {
   761     return false;
   762   }
   764   MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
   765   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
   766                                    new SocketSendTask(mImpl, this, aData));
   767   return true;
   768 }
   770 bool
   771 UnixSocketConsumer::SendSocketData(const nsACString& aStr)
   772 {
   773   MOZ_ASSERT(NS_IsMainThread());
   774   if (!mImpl) {
   775     return false;
   776   }
   777   if (aStr.Length() > MAX_READ_SIZE) {
   778     return false;
   779   }
   781   MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
   782   UnixSocketRawData* d = new UnixSocketRawData(aStr.BeginReading(),
   783                                                aStr.Length());
   784   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
   785                                    new SocketSendTask(mImpl, this, d));
   786   return true;
   787 }
   789 void
   790 UnixSocketConsumer::CloseSocket()
   791 {
   792   MOZ_ASSERT(NS_IsMainThread());
   793   if (!mImpl) {
   794     return;
   795   }
   797   mImpl->CancelDelayedConnectTask();
   799   // From this point on, we consider mImpl as being deleted.
   800   // We sever the relationship here so any future calls to listen or connect
   801   // will create a new implementation.
   802   mImpl->ShutdownOnMainThread();
   804   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
   805                                    new ShutdownSocketTask(mImpl));
   807   mImpl = nullptr;
   809   NotifyDisconnect();
   810 }
   812 void
   813 UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr)
   814 {
   815   aAddrStr.Truncate();
   816   if (!mImpl || mConnectionStatus != SOCKET_CONNECTED) {
   817     NS_WARNING("No socket currently open!");
   818     return;
   819   }
   820   mImpl->GetSocketAddr(aAddrStr);
   821 }
   823 void
   824 UnixSocketConsumer::NotifySuccess()
   825 {
   826   MOZ_ASSERT(NS_IsMainThread());
   827   mConnectionStatus = SOCKET_CONNECTED;
   828   mConnectTimestamp = PR_IntervalNow();
   829   OnConnectSuccess();
   830 }
   832 void
   833 UnixSocketConsumer::NotifyError()
   834 {
   835   MOZ_ASSERT(NS_IsMainThread());
   836   mConnectionStatus = SOCKET_DISCONNECTED;
   837   mConnectDelayMs = CalculateConnectDelayMs();
   838   OnConnectError();
   839 }
   841 void
   842 UnixSocketConsumer::NotifyDisconnect()
   843 {
   844   MOZ_ASSERT(NS_IsMainThread());
   845   mConnectionStatus = SOCKET_DISCONNECTED;
   846   mConnectDelayMs = CalculateConnectDelayMs();
   847   OnDisconnect();
   848 }
   850 bool
   851 UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
   852                                   const char* aAddress,
   853                                   int aDelayMs)
   854 {
   855   MOZ_ASSERT(aConnector);
   856   MOZ_ASSERT(NS_IsMainThread());
   858   nsAutoPtr<UnixSocketConnector> connector(aConnector);
   860   if (mImpl) {
   861     NS_WARNING("Socket already connecting/connected!");
   862     return false;
   863   }
   865   nsCString addr(aAddress);
   866   MessageLoop* ioLoop = XRE_GetIOMessageLoop();
   867   mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr);
   868   mConnectionStatus = SOCKET_CONNECTING;
   869   if (aDelayMs > 0) {
   870     SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl);
   871     mImpl->SetDelayedConnectTask(connectTask);
   872     MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs);
   873   } else {
   874     ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl));
   875   }
   876   return true;
   877 }
   879 bool
   880 UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
   881 {
   882   MOZ_ASSERT(aConnector);
   883   MOZ_ASSERT(NS_IsMainThread());
   885   nsAutoPtr<UnixSocketConnector> connector(aConnector);
   887   if (mImpl) {
   888     NS_WARNING("Socket already connecting/connected!");
   889     return false;
   890   }
   892   mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(),
   893                              EmptyCString());
   894   mConnectionStatus = SOCKET_LISTENING;
   895   XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
   896                                    new SocketListenTask(mImpl));
   897   return true;
   898 }
   900 uint32_t
   901 UnixSocketConsumer::CalculateConnectDelayMs() const
   902 {
   903   MOZ_ASSERT(NS_IsMainThread());
   905   uint32_t connectDelayMs = mConnectDelayMs;
   907   if ((PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) {
   908     // reset delay if connection has been opened for a while, or...
   909     connectDelayMs = 0;
   910   } else if (!connectDelayMs) {
   911     // ...start with a delay of ~1 sec, or...
   912     connectDelayMs = 1<<10;
   913   } else if (connectDelayMs < (1<<16)) {
   914     // ...otherwise increase delay by a factor of 2
   915     connectDelayMs <<= 1;
   916   }
   917   return connectDelayMs;
   918 }
   920 } // namespace ipc
   921 } // namespace mozilla

mercurial