netwerk/sctp/datachannel/DataChannel.cpp

Wed, 31 Dec 2014 06:55:46 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:55:46 +0100
changeset 1
ca08bd8f51b2
permissions
-rw-r--r--

Added tag TORBROWSER_REPLICA for changeset 6474c204b198

     1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* vim: set ts=2 et sw=2 tw=80: */
     3 /* This Source Code Form is subject to the terms of the Mozilla Public
     4  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
     5  * You can obtain one at http://mozilla.org/MPL/2.0/. */
     7 #include <stdio.h>
     8 #include <stdlib.h>
     9 #if !defined(__Userspace_os_Windows)
    10 #include <arpa/inet.h>
    11 #endif
    12 // usrsctp.h expects to have errno definitions prior to its inclusion.
    13 #include <errno.h>
    15 #define SCTP_DEBUG 1
    16 #define SCTP_STDINT_INCLUDE <stdint.h>
    18 #ifdef _MSC_VER
    19 // Disable "warning C4200: nonstandard extension used : zero-sized array in
    20 //          struct/union"
    21 // ...which the third-party file usrsctp.h runs afoul of.
    22 #pragma warning(push)
    23 #pragma warning(disable:4200)
    24 #endif
    26 #include "usrsctp.h"
    28 #ifdef _MSC_VER
    29 #pragma warning(pop)
    30 #endif
    32 #include "DataChannelLog.h"
    34 #include "nsServiceManagerUtils.h"
    35 #include "nsIObserverService.h"
    36 #include "nsIObserver.h"
    37 #include "mozilla/Services.h"
    38 #include "nsProxyRelease.h"
    39 #include "nsThread.h"
    40 #include "nsThreadUtils.h"
    41 #include "nsAutoPtr.h"
    42 #include "nsNetUtil.h"
    43 #include "mozilla/StaticPtr.h"
    44 #ifdef MOZ_PEERCONNECTION
    45 #include "mtransport/runnable_utils.h"
    46 #endif
    48 #define DATACHANNEL_LOG(args) LOG(args)
    49 #include "DataChannel.h"
    50 #include "DataChannelProtocol.h"
    52 #ifdef PR_LOGGING
    53 PRLogModuleInfo*
    54 GetDataChannelLog()
    55 {
    56   static PRLogModuleInfo* sLog;
    57   if (!sLog)
    58     sLog = PR_NewLogModule("DataChannel");
    59   return sLog;
    60 }
    62 PRLogModuleInfo*
    63 GetSCTPLog()
    64 {
    65   static PRLogModuleInfo* sLog;
    66   if (!sLog)
    67     sLog = PR_NewLogModule("SCTP");
    68   return sLog;
    69 }
    70 #endif
    72 // Let us turn on and off important assertions in non-debug builds
    73 #ifdef DEBUG
    74 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
    75 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
    76 #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
    77 #endif
    79 static bool sctp_initialized;
    81 namespace mozilla {
    83 class DataChannelShutdown;
    84 StaticRefPtr<DataChannelShutdown> gDataChannelShutdown;
    86 class DataChannelShutdown : public nsIObserver
    87 {
    88 public:
    89   // This needs to be tied to some form object that is guaranteed to be
    90   // around (singleton likely) unless we want to shutdown sctp whenever
    91   // we're not using it (and in which case we'd keep a refcnt'd object
    92   // ref'd by each DataChannelConnection to release the SCTP usrlib via
    93   // sctp_finish)
    95   NS_DECL_ISUPPORTS
    97   DataChannelShutdown() {}
    99   void Init()
   100     {
   101       nsCOMPtr<nsIObserverService> observerService =
   102         mozilla::services::GetObserverService();
   103       if (!observerService)
   104         return;
   106       nsresult rv = observerService->AddObserver(this,
   107                                                  "profile-change-net-teardown",
   108                                                  false);
   109       MOZ_ASSERT(rv == NS_OK);
   110       (void) rv;
   111     }
   113   virtual ~DataChannelShutdown()
   114     {
   115       nsCOMPtr<nsIObserverService> observerService =
   116         mozilla::services::GetObserverService();
   117       if (observerService)
   118         observerService->RemoveObserver(this, "profile-change-net-teardown");
   119     }
   121   NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic,
   122                         const char16_t* aData) {
   123     if (strcmp(aTopic, "profile-change-net-teardown") == 0) {
   124       LOG(("Shutting down SCTP"));
   125       if (sctp_initialized) {
   126         usrsctp_finish();
   127         sctp_initialized = false;
   128       }
   129       nsCOMPtr<nsIObserverService> observerService =
   130         mozilla::services::GetObserverService();
   131       if (!observerService)
   132         return NS_ERROR_FAILURE;
   134       nsresult rv = observerService->RemoveObserver(this,
   135                                                     "profile-change-net-teardown");
   136       MOZ_ASSERT(rv == NS_OK);
   137       (void) rv;
   139       nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this);
   140       gDataChannelShutdown = nullptr;
   141     }
   142     return NS_OK;
   143   }
   144 };
   146 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
   149 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
   150                          uint32_t length) : mLength(length)
   151 {
   152   mSpa = new sctp_sendv_spa;
   153   *mSpa = spa;
   154   char *tmp = new char[length]; // infallible malloc!
   155   memcpy(tmp, data, length);
   156   mData = tmp;
   157 }
   159 BufferedMsg::~BufferedMsg()
   160 {
   161   delete mSpa;
   162   delete mData;
   163 }
   165 static int
   166 receive_cb(struct socket* sock, union sctp_sockstore addr,
   167            void *data, size_t datalen,
   168            struct sctp_rcvinfo rcv, int flags, void *ulp_info)
   169 {
   170   DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
   171   return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
   172 }
   174 #ifdef PR_LOGGING
   175 static void
   176 debug_printf(const char *format, ...)
   177 {
   178   va_list ap;
   179   char buffer[1024];
   181   if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
   182     va_start(ap, format);
   183 #ifdef _WIN32
   184     if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
   185 #else
   186     if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) {
   187 #endif
   188       PR_LogPrint("%s", buffer);
   189     }
   190     va_end(ap);
   191   }
   192 }
   193 #endif
   195 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
   196    mLock("netwerk::sctp::DataChannelConnection")
   197 {
   198   mState = CLOSED;
   199   mSocket = nullptr;
   200   mMasterSocket = nullptr;
   201   mListener = listener->asWeakPtr();
   202   mLocalPort = 0;
   203   mRemotePort = 0;
   204   mDeferTimeout = 10;
   205   mTimerRunning = false;
   206   LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
   207   mInternalIOThread = nullptr;
   208 }
   210 DataChannelConnection::~DataChannelConnection()
   211 {
   212   LOG(("Deleting DataChannelConnection %p", (void *) this));
   213   // This may die on the MainThread, or on the STS thread
   214   ASSERT_WEBRTC(mState == CLOSED);
   215   MOZ_ASSERT(!mMasterSocket);
   216   MOZ_ASSERT(mPending.GetSize() == 0);
   218   // Already disconnected from sigslot/mTransportFlow
   219   // TransportFlows must be released from the STS thread
   220   if (!IsSTSThread()) {
   221     ASSERT_WEBRTC(NS_IsMainThread());
   222     if (mTransportFlow) {
   223       ASSERT_WEBRTC(mSTS);
   224       NS_ProxyRelease(mSTS, mTransportFlow);
   225     }
   227     if (mInternalIOThread) {
   228       // Avoid spinning the event thread from here (which if we're mainthread
   229       // is in the event loop already)
   230       NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
   231                                            &nsIThread::Shutdown),
   232                               NS_DISPATCH_NORMAL);
   233     }
   234   } else {
   235     // on STS, safe to call shutdown
   236     if (mInternalIOThread) {
   237       mInternalIOThread->Shutdown();
   238     }
   239   }
   240 }
   242 void
   243 DataChannelConnection::Destroy()
   244 {
   245   // Though it's probably ok to do this and close the sockets;
   246   // if we really want it to do true clean shutdowns it can
   247   // create a dependant Internal object that would remain around
   248   // until the network shut down the association or timed out.
   249   LOG(("Destroying DataChannelConnection %p", (void *) this));
   250   ASSERT_WEBRTC(NS_IsMainThread());
   251   CloseAll();
   253   MutexAutoLock lock(mLock);
   254   // If we had a pending reset, we aren't waiting for it - clear the list so
   255   // we can deregister this DataChannelConnection without leaking.
   256   ClearResets();
   258   MOZ_ASSERT(mSTS);
   259   ASSERT_WEBRTC(NS_IsMainThread());
   260   // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
   261   // the usrsctp_close() calls can move back here (and just proxy the
   262   // disconnect_all())
   263   RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
   264                                    &DataChannelConnection::DestroyOnSTS,
   265                                    mSocket, mMasterSocket),
   266                 NS_DISPATCH_NORMAL);
   268   // These will be released on STS
   269   mSocket = nullptr;
   270   mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
   272   // Must do this in Destroy() since we may then delete this object
   273   if (mUsingDtls) {
   274     usrsctp_deregister_address(static_cast<void *>(this));
   275     LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
   276   }
   278   // We can't get any more new callbacks from the SCTP library
   279   // All existing callbacks have refs to DataChannelConnection
   281   // nsDOMDataChannel objects have refs to DataChannels that have refs to us
   282 }
   284 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
   285                                          struct socket *aSocket)
   286 {
   287   if (aSocket && aSocket != aMasterSocket)
   288     usrsctp_close(aSocket);
   289   if (aMasterSocket)
   290     usrsctp_close(aMasterSocket);
   292   disconnect_all();
   293 }
   295 NS_IMPL_ISUPPORTS(DataChannelConnection,
   296                   nsITimerCallback)
   298 bool
   299 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
   300 {
   301   struct sctp_initmsg initmsg;
   302   struct sctp_udpencaps encaps;
   303   struct sctp_assoc_value av;
   304   struct sctp_event event;
   305   socklen_t len;
   307   uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
   308                             SCTP_PEER_ADDR_CHANGE,
   309                             SCTP_REMOTE_ERROR,
   310                             SCTP_SHUTDOWN_EVENT,
   311                             SCTP_ADAPTATION_INDICATION,
   312                             SCTP_SEND_FAILED_EVENT,
   313                             SCTP_STREAM_RESET_EVENT,
   314                             SCTP_STREAM_CHANGE_EVENT};
   315   {
   316     ASSERT_WEBRTC(NS_IsMainThread());
   318     // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
   319     if (!sctp_initialized) {
   320       if (aUsingDtls) {
   321         LOG(("sctp_init(DTLS)"));
   322 #ifdef MOZ_PEERCONNECTION
   323         usrsctp_init(0,
   324                      DataChannelConnection::SctpDtlsOutput,
   325 #ifdef PR_LOGGING
   326                      debug_printf
   327 #else
   328                      nullptr
   329 #endif
   330                     );
   331 #else
   332         NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
   333 #endif
   334       } else {
   335         LOG(("sctp_init(%u)", aPort));
   336         usrsctp_init(aPort,
   337                      nullptr,
   338 #ifdef PR_LOGGING
   339                      debug_printf
   340 #else
   341                      nullptr
   342 #endif
   343                     );
   344       }
   346 #ifdef PR_LOGGING
   347       // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs
   348       if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
   349         usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
   350       }
   351 #endif
   352       usrsctp_sysctl_set_sctp_blackhole(2);
   353       // ECN is currently not supported by the Firefox code
   354       usrsctp_sysctl_set_sctp_ecn_enable(0);
   355       sctp_initialized = true;
   357       gDataChannelShutdown = new DataChannelShutdown();
   358       gDataChannelShutdown->Init();
   359     }
   360   }
   362   // XXX FIX! make this a global we get once
   363   // Find the STS thread
   364   nsresult rv;
   365   mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
   366   MOZ_ASSERT(NS_SUCCEEDED(rv));
   368   // Open sctp with a callback
   369   if ((mMasterSocket = usrsctp_socket(
   370          aUsingDtls ? AF_CONN : AF_INET,
   371          SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
   372     return false;
   373   }
   375   // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
   376   // in associations for normal IO
   377   if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
   378     LOG(("Couldn't set non_blocking on SCTP socket"));
   379     // We can't handle connect() safely if it will block, not that this will
   380     // even happen.
   381     goto error_cleanup;
   382   }
   384   // Make sure when we close the socket, make sure it doesn't call us back again!
   385   // This would cause it try to use an invalid DataChannelConnection pointer
   386   struct linger l;
   387   l.l_onoff = 1;
   388   l.l_linger = 0;
   389   if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
   390                          (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
   391     LOG(("Couldn't set SO_LINGER on SCTP socket"));
   392     // unsafe to allow it to continue if this fails
   393     goto error_cleanup;
   394   }
   396   // XXX Consider disabling this when we add proper SDP negotiation.
   397   // We may want to leave enabled for supporting 'cloning' of SDP offers, which
   398   // implies re-use of the same pseudo-port number, or forcing a renegotiation.
   399   {
   400     uint32_t on = 1;
   401     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
   402                            (const void *)&on, (socklen_t)sizeof(on)) < 0) {
   403       LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
   404     }
   405     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
   406                            (const void *)&on, (socklen_t)sizeof(on)) < 0) {
   407       LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
   408     }
   409   }
   411   if (!aUsingDtls) {
   412     memset(&encaps, 0, sizeof(encaps));
   413     encaps.sue_address.ss_family = AF_INET;
   414     encaps.sue_port = htons(aPort);
   415     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
   416                            (const void*)&encaps,
   417                            (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
   418       LOG(("*** failed encaps errno %d", errno));
   419       goto error_cleanup;
   420     }
   421     LOG(("SCTP encapsulation local port %d", aPort));
   422   }
   424   av.assoc_id = SCTP_ALL_ASSOC;
   425   av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
   426   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
   427                          (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
   428     LOG(("*** failed enable stream reset errno %d", errno));
   429     goto error_cleanup;
   430   }
   432   /* Enable the events of interest. */
   433   memset(&event, 0, sizeof(event));
   434   event.se_assoc_id = SCTP_ALL_ASSOC;
   435   event.se_on = 1;
   436   for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) {
   437     event.se_type = event_types[i];
   438     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
   439       LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
   440       goto error_cleanup;
   441     }
   442   }
   444   // Update number of streams
   445   mStreams.AppendElements(aNumStreams);
   446   for (uint32_t i = 0; i < aNumStreams; ++i) {
   447     mStreams[i] = nullptr;
   448   }
   449   memset(&initmsg, 0, sizeof(initmsg));
   450   len = sizeof(initmsg);
   451   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
   452     LOG(("*** failed getsockopt SCTP_INITMSG"));
   453     goto error_cleanup;
   454   }
   455   LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
   456        initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
   457   initmsg.sinit_num_ostreams  = aNumStreams;
   458   initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
   459   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
   460                          (socklen_t)sizeof(initmsg)) < 0) {
   461     LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
   462     goto error_cleanup;
   463   }
   465   mSocket = nullptr;
   466   if (aUsingDtls) {
   467     mUsingDtls = true;
   468     usrsctp_register_address(static_cast<void *>(this));
   469     LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
   470   } else {
   471     mUsingDtls = false;
   472   }
   473   return true;
   475 error_cleanup:
   476   usrsctp_close(mMasterSocket);
   477   mMasterSocket = nullptr;
   478   mUsingDtls = false;
   479   return false;
   480 }
   482 void
   483 DataChannelConnection::StartDefer()
   484 {
   485   nsresult rv;
   486   if (!NS_IsMainThread()) {
   487     NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   488                             DataChannelOnMessageAvailable::START_DEFER,
   489                             this, (DataChannel *) nullptr));
   490     return;
   491   }
   493   ASSERT_WEBRTC(NS_IsMainThread());
   494   if (!mDeferredTimer) {
   495     mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
   496     MOZ_ASSERT(mDeferredTimer);
   497   }
   499   if (!mTimerRunning) {
   500     rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
   501                                           nsITimer::TYPE_ONE_SHOT);
   502     NS_ENSURE_TRUE_VOID(rv == NS_OK);
   504     mTimerRunning = true;
   505   }
   506 }
   508 // nsITimerCallback
   510 NS_IMETHODIMP
   511 DataChannelConnection::Notify(nsITimer *timer)
   512 {
   513   ASSERT_WEBRTC(NS_IsMainThread());
   514   LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout));
   516   if (timer == mDeferredTimer) {
   517     if (SendDeferredMessages()) {
   518       // Still blocked
   519       // we don't need a lock, since this must be main thread...
   520       nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
   521                                                      nsITimer::TYPE_ONE_SHOT);
   522       if (NS_FAILED(rv)) {
   523         LOG(("%s: cannot initialize open timer", __FUNCTION__));
   524         // XXX and do....?
   525         return rv;
   526       }
   527       mTimerRunning = true;
   528     } else {
   529       LOG(("Turned off deferred send timer"));
   530       mTimerRunning = false;
   531     }
   532   }
   533   return NS_OK;
   534 }
   536 #ifdef MOZ_PEERCONNECTION
   537 void
   538 DataChannelConnection::SetEvenOdd()
   539 {
   540   ASSERT_WEBRTC(IsSTSThread());
   542   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
   543       mTransportFlow->GetLayer(TransportLayerDtls::ID()));
   544   MOZ_ASSERT(dtls);  // DTLS is mandatory
   545   mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
   546 }
   548 bool
   549 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
   550 {
   551   LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
   553   NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
   554   NS_ENSURE_TRUE(aFlow, false);
   556   mTransportFlow = aFlow;
   557   mLocalPort = localport;
   558   mRemotePort = remoteport;
   559   mState = CONNECTING;
   561   RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
   562                                    &DataChannelConnection::SetSignals),
   563                 NS_DISPATCH_NORMAL);
   564   return true;
   565 }
   567 void
   568 DataChannelConnection::SetSignals()
   569 {
   570   ASSERT_WEBRTC(IsSTSThread());
   571   ASSERT_WEBRTC(mTransportFlow);
   572   LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
   573   mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
   574   // SignalStateChange() doesn't call you with the initial state
   575   mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
   576   CompleteConnect(mTransportFlow, mTransportFlow->state());
   577 }
   579 void
   580 DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
   581 {
   582   LOG(("Data transport state: %d", state));
   583   MutexAutoLock lock(mLock);
   584   ASSERT_WEBRTC(IsSTSThread());
   585   // We should abort connection on TS_ERROR.
   586   // Note however that the association will also fail (perhaps with a delay) and
   587   // notify us in that way
   588   if (state != TransportLayer::TS_OPEN || !mMasterSocket)
   589     return;
   591   struct sockaddr_conn addr;
   592   memset(&addr, 0, sizeof(addr));
   593   addr.sconn_family = AF_CONN;
   594 #if defined(__Userspace_os_Darwin)
   595   addr.sconn_len = sizeof(addr);
   596 #endif
   597   addr.sconn_port = htons(mLocalPort);
   598   addr.sconn_addr = static_cast<void *>(this);
   600   LOG(("Calling usrsctp_bind"));
   601   int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
   602                        sizeof(addr));
   603   if (r < 0) {
   604     LOG(("usrsctp_bind failed: %d", r));
   605   } else {
   606     // This is the remote addr
   607     addr.sconn_port = htons(mRemotePort);
   608     LOG(("Calling usrsctp_connect"));
   609     r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
   610                         sizeof(addr));
   611     if (r < 0) {
   612       if (errno == EINPROGRESS) {
   613         // non-blocking
   614         return;
   615       } else {
   616         LOG(("usrsctp_connect failed: %d", errno));
   617         mState = CLOSED;
   618       }
   619     } else {
   620       // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
   621       // This also avoids issues with calling TransportFlow stuff on Mainthread
   622       return;
   623     }
   624   }
   625   // Note: currently this doesn't actually notify the application
   626   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   627                             DataChannelOnMessageAvailable::ON_CONNECTION,
   628                             this, false));
   629   return;
   630 }
   632 // Process any pending Opens
   633 void
   634 DataChannelConnection::ProcessQueuedOpens()
   635 {
   636   // The nsDeque holds channels with an AddRef applied.  Another reference
   637   // (may) be held by the DOMDataChannel, unless it's been GC'd.  No other
   638   // references should exist.
   640   // Can't copy nsDeque's.  Move into temp array since any that fail will
   641   // go back to mPending
   642   nsDeque temp;
   643   DataChannel *temp_channel; // really already_AddRefed<>
   644   while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
   645     temp.Push(static_cast<void *>(temp_channel));
   646   }
   648   nsRefPtr<DataChannel> channel;
   649   // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
   650   while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
   651     if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
   652       LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
   653       channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
   654       // OpenFinish returns a reference itself, so we need to take it can Release it
   655       channel = OpenFinish(channel.forget()); // may reset the flag and re-push
   656     } else {
   657       NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
   658     }
   659   }
   661 }
   662 void
   663 DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
   664                                      const unsigned char *data, size_t len)
   665 {
   666 #ifdef PR_LOGGING
   667   if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
   668     char *buf;
   670     if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
   671       PR_LogPrint("%s", buf);
   672       usrsctp_freedumpbuffer(buf);
   673     }
   674   }
   675 #endif
   676   // Pass the data to SCTP
   677   usrsctp_conninput(static_cast<void *>(this), data, len, 0);
   678 }
   680 int
   681 DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release)
   682 {
   683   //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
   684   int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
   685   if (release)
   686     delete data;
   687   return res;
   688 }
   690 /* static */
   691 int
   692 DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
   693                                       uint8_t tos, uint8_t set_df)
   694 {
   695   DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
   696   int res;
   698 #ifdef PR_LOGGING
   699   if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
   700     char *buf;
   702     if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
   703       PR_LogPrint("%s", buf);
   704       usrsctp_freedumpbuffer(buf);
   705     }
   706   }
   707 #endif
   708   // We're async proxying even if on the STSThread because this is called
   709   // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
   710   // SCTP has an option for Apple, on IP connections only, to release at least
   711   // one of the locks before calling a packet output routine; with changes to
   712   // the underlying SCTP stack this might remove the need to use an async proxy.
   713   if (0 /*peer->IsSTSThread()*/) {
   714     res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
   715   } else {
   716     unsigned char *data = new unsigned char[length];
   717     memcpy(data, buffer, length);
   718     res = -1;
   719     // XXX It might be worthwhile to add an assertion against the thread
   720     // somehow getting into the DataChannel/SCTP code again, as
   721     // DISPATCH_SYNC is not fully blocking.  This may be tricky, as it
   722     // needs to be a per-thread check, not a global.
   723     peer->mSTS->Dispatch(WrapRunnable(
   724                            nsRefPtr<DataChannelConnection>(peer),
   725                            &DataChannelConnection::SendPacket, data, length, true),
   726                          NS_DISPATCH_NORMAL);
   727     res = 0; // cheat!  Packets can always be dropped later anyways
   728   }
   729   return res;
   730 }
   731 #endif
   733 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
   734 // listen for incoming associations
   735 // Blocks! - Don't call this from main thread!
   737 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
   739 bool
   740 DataChannelConnection::Listen(unsigned short port)
   741 {
   742   struct sockaddr_in addr;
   743   socklen_t addr_len;
   745   NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
   747   /* Acting as the 'server' */
   748   memset((void *)&addr, 0, sizeof(addr));
   749 #ifdef HAVE_SIN_LEN
   750   addr.sin_len = sizeof(struct sockaddr_in);
   751 #endif
   752   addr.sin_family = AF_INET;
   753   addr.sin_port = htons(port);
   754   addr.sin_addr.s_addr = htonl(INADDR_ANY);
   755   LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
   756   mState = CONNECTING;
   757   if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
   758     LOG(("***Failed userspace_bind"));
   759     return false;
   760   }
   761   if (usrsctp_listen(mMasterSocket, 1) < 0) {
   762     LOG(("***Failed userspace_listen"));
   763     return false;
   764   }
   766   LOG(("Accepting connection"));
   767   addr_len = 0;
   768   if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
   769     LOG(("***Failed accept"));
   770     return false;
   771   }
   772   mState = OPEN;
   774   struct linger l;
   775   l.l_onoff = 1;
   776   l.l_linger = 0;
   777   if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
   778                          (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
   779     LOG(("Couldn't set SO_LINGER on SCTP socket"));
   780   }
   782   SetEvenOdd();
   784   // Notify Connection open
   785   // XXX We need to make sure connection sticks around until the message is delivered
   786   LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
   787   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   788                             DataChannelOnMessageAvailable::ON_CONNECTION,
   789                             this, (DataChannel *) nullptr));
   790   return true;
   791 }
   793 // Blocks! - Don't call this from main thread!
   794 bool
   795 DataChannelConnection::Connect(const char *addr, unsigned short port)
   796 {
   797   struct sockaddr_in addr4;
   798   struct sockaddr_in6 addr6;
   800   NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
   802   /* Acting as the connector */
   803   LOG(("Connecting to %s, port %u", addr, port));
   804   memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
   805   memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
   806 #ifdef HAVE_SIN_LEN
   807   addr4.sin_len = sizeof(struct sockaddr_in);
   808 #endif
   809 #ifdef HAVE_SIN6_LEN
   810   addr6.sin6_len = sizeof(struct sockaddr_in6);
   811 #endif
   812   addr4.sin_family = AF_INET;
   813   addr6.sin6_family = AF_INET6;
   814   addr4.sin_port = htons(port);
   815   addr6.sin6_port = htons(port);
   816   mState = CONNECTING;
   818 #if !defined(__Userspace_os_Windows)
   819   if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
   820     if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
   821       LOG(("*** Failed userspace_connect"));
   822       return false;
   823     }
   824   } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
   825     if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
   826       LOG(("*** Failed userspace_connect"));
   827       return false;
   828     }
   829   } else {
   830     LOG(("*** Illegal destination address."));
   831   }
   832 #else
   833   {
   834     struct sockaddr_storage ss;
   835     int sslen = sizeof(ss);
   837     if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
   838       addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
   839       if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
   840         LOG(("*** Failed userspace_connect"));
   841         return false;
   842       }
   843     } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
   844       addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
   845       if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
   846         LOG(("*** Failed userspace_connect"));
   847         return false;
   848       }
   849     } else {
   850       LOG(("*** Illegal destination address."));
   851     }
   852   }
   853 #endif
   855   mSocket = mMasterSocket;
   857   LOG(("connect() succeeded!  Entering connected mode"));
   858   mState = OPEN;
   860   SetEvenOdd();
   862   // Notify Connection open
   863   // XXX We need to make sure connection sticks around until the message is delivered
   864   LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
   865   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   866                             DataChannelOnMessageAvailable::ON_CONNECTION,
   867                             this, (DataChannel *) nullptr));
   868   return true;
   869 }
   870 #endif
   872 DataChannel *
   873 DataChannelConnection::FindChannelByStream(uint16_t stream)
   874 {
   875   return mStreams.SafeElementAt(stream);
   876 }
   878 uint16_t
   879 DataChannelConnection::FindFreeStream()
   880 {
   881   uint32_t i, j, limit;
   883   limit = mStreams.Length();
   884   if (limit > MAX_NUM_STREAMS)
   885     limit = MAX_NUM_STREAMS;
   887   for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
   888     if (!mStreams[i]) {
   889       // Verify it's not still in the process of closing
   890       for (j = 0; j < mStreamsResetting.Length(); ++j) {
   891         if (mStreamsResetting[j] == i) {
   892           break;
   893         }
   894       }
   895       if (j == mStreamsResetting.Length())
   896         break;
   897     }
   898   }
   899   if (i >= limit) {
   900     return INVALID_STREAM;
   901   }
   902   return i;
   903 }
   905 bool
   906 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
   907 {
   908   struct sctp_status status;
   909   struct sctp_add_streams sas;
   910   uint32_t outStreamsNeeded;
   911   socklen_t len;
   913   if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
   914     aNeeded = MAX_NUM_STREAMS - mStreams.Length();
   915   }
   916   if (aNeeded <= 0) {
   917     return false;
   918   }
   920   len = (socklen_t)sizeof(struct sctp_status);
   921   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
   922     LOG(("***failed: getsockopt SCTP_STATUS"));
   923     return false;
   924   }
   925   outStreamsNeeded = aNeeded; // number to add
   927   // Note: if multiple channel opens happen when we don't have enough space,
   928   // we'll call RequestMoreStreams() multiple times
   929   memset(&sas, 0, sizeof(sas));
   930   sas.sas_instrms = 0;
   931   sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
   932   // Doesn't block, we get an event when it succeeds or fails
   933   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
   934                          (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
   935     if (errno == EALREADY) {
   936       LOG(("Already have %u output streams", outStreamsNeeded));
   937       return true;
   938     }
   940     LOG(("***failed: setsockopt ADD errno=%d", errno));
   941     return false;
   942   }
   943   LOG(("Requested %u more streams", outStreamsNeeded));
   944   // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
   945   // values are larger than mStreams.Length()
   946   return true;
   947 }
   949 int32_t
   950 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
   951 {
   952   struct sctp_sndinfo sndinfo;
   954   // Note: Main-thread IO, but doesn't block
   955   memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
   956   sndinfo.snd_sid = stream;
   957   sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
   958   if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
   959                     &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
   960                     SCTP_SENDV_SNDINFO, 0) < 0) {
   961     //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
   962     return (0);
   963   }
   964   return (1);
   965 }
   967 int32_t
   968 DataChannelConnection::SendOpenAckMessage(uint16_t stream)
   969 {
   970   struct rtcweb_datachannel_ack ack;
   972   memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
   973   ack.msg_type = DATA_CHANNEL_ACK;
   975   return SendControlMessage(&ack, sizeof(ack), stream);
   976 }
   978 int32_t
   979 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
   980                                               const nsACString& protocol,
   981                                               uint16_t stream, bool unordered,
   982                                               uint16_t prPolicy, uint32_t prValue)
   983 {
   984   int label_len = label.Length(); // not including nul
   985   int proto_len = protocol.Length(); // not including nul
   986   struct rtcweb_datachannel_open_request *req =
   987     (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len);
   988    // careful - request includes 1 char label
   990   memset(req, 0, sizeof(struct rtcweb_datachannel_open_request));
   991   req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
   992   switch (prPolicy) {
   993   case SCTP_PR_SCTP_NONE:
   994     req->channel_type = DATA_CHANNEL_RELIABLE;
   995     break;
   996   case SCTP_PR_SCTP_TTL:
   997     req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
   998     break;
   999   case SCTP_PR_SCTP_RTX:
  1000     req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
  1001     break;
  1002   default:
  1003     // FIX! need to set errno!  Or make all these SendXxxx() funcs return 0 or errno!
  1004     moz_free(req);
  1005     return (0);
  1007   if (unordered) {
  1008     // Per the current types, all differ by 0x80 between ordered and unordered
  1009     req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
  1012   req->reliability_param = htonl(prValue);
  1013   req->priority = htons(0); /* XXX: add support */
  1014   req->label_length = htons(label_len);
  1015   req->protocol_length = htons(proto_len);
  1016   memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
  1017   memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
  1019   // sizeof(*req) already includes +1 byte for label, need nul for both strings
  1020   int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream);
  1022   moz_free(req);
  1023   return result;
  1026 // XXX This should use a separate thread (outbound queue) which should
  1027 // select() to know when to *try* to send data to the socket again.
  1028 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
  1029 // (just not sure in what direction).  We could re-implement NSPR's
  1030 // PR_POLL_WRITE/etc handling... with a lot of work.
  1032 // Better yet, use the SCTP stack's notifications on buffer state to avoid
  1033 // filling the SCTP's buffers.
  1035 // returns if we're still blocked or not
  1036 bool
  1037 DataChannelConnection::SendDeferredMessages()
  1039   uint32_t i;
  1040   nsRefPtr<DataChannel> channel; // we may null out the refs to this
  1041   bool still_blocked = false;
  1042   bool sent = false;
  1044   // This may block while something is modifying channels, but should not block for IO
  1045   MutexAutoLock lock(mLock);
  1047   // XXX For total fairness, on a still_blocked we'd start next time at the
  1048   // same index.  Sorry, not going to bother for now.
  1049   for (i = 0; i < mStreams.Length(); ++i) {
  1050     channel = mStreams[i];
  1051     if (!channel)
  1052       continue;
  1054     // Only one of these should be set....
  1055     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
  1056       if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
  1057                                  channel->mStream,
  1058                                  channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
  1059                                  channel->mPrPolicy, channel->mPrValue)) {
  1060         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
  1062         channel->mState = OPEN;
  1063         channel->mReady = true;
  1064         LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1065         NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1066                                   DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
  1067                                   channel));
  1068         sent = true;
  1069       } else {
  1070         if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1071           still_blocked = true;
  1072         } else {
  1073           // Close the channel, inform the user
  1074           mStreams[channel->mStream] = nullptr;
  1075           channel->mState = CLOSED;
  1076           // Don't need to reset; we didn't open it
  1077           NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1078                                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1079                                     channel));
  1083     if (still_blocked)
  1084       break;
  1086     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
  1087       if (SendOpenAckMessage(channel->mStream)) {
  1088         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
  1089         sent = true;
  1090       } else {
  1091         if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1092           still_blocked = true;
  1093         } else {
  1094           // Close the channel, inform the user
  1095           CloseInt(channel);
  1096           // XXX send error via DataChannelOnMessageAvailable (bug 843625)
  1100     if (still_blocked)
  1101       break;
  1103     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
  1104       bool failed_send = false;
  1105       int32_t result;
  1107       if (channel->mState == CLOSED || channel->mState == CLOSING) {
  1108         channel->mBufferedData.Clear();
  1110       while (!channel->mBufferedData.IsEmpty() &&
  1111              !failed_send) {
  1112         struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
  1113         const char *data           = channel->mBufferedData[0]->mData;
  1114         uint32_t len               = channel->mBufferedData[0]->mLength;
  1116         // SCTP will return EMSGSIZE if the message is bigger than the buffer
  1117         // size (or EAGAIN if there isn't space)
  1118         if ((result = usrsctp_sendv(mSocket, data, len,
  1119                                     nullptr, 0,
  1120                                     (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
  1121                                     SCTP_SENDV_SPA,
  1122                                     0) < 0)) {
  1123           if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1124             // leave queued for resend
  1125             failed_send = true;
  1126             LOG(("queue full again when resending %d bytes (%d)", len, result));
  1127           } else {
  1128             LOG(("error %d re-sending string", errno));
  1129             failed_send = true;
  1131         } else {
  1132           LOG(("Resent buffer of %d bytes (%d)", len, result));
  1133           sent = true;
  1134           channel->mBufferedData.RemoveElementAt(0);
  1137       if (channel->mBufferedData.IsEmpty())
  1138         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
  1139       else
  1140         still_blocked = true;
  1142     if (still_blocked)
  1143       break;
  1146   if (!still_blocked) {
  1147     // mDeferTimeout becomes an estimate of how long we need to wait next time we block
  1148     return false;
  1150   // adjust time?  More time for next wait if we didn't send anything, less if did
  1151   // Pretty crude, but better than nothing; just to keep CPU use down
  1152   if (!sent && mDeferTimeout < 50)
  1153     mDeferTimeout++;
  1154   else if (sent && mDeferTimeout > 10)
  1155     mDeferTimeout--;
  1157   return true;
  1160 void
  1161 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
  1162                                                 size_t length,
  1163                                                 uint16_t stream)
  1165   nsRefPtr<DataChannel> channel;
  1166   uint32_t prValue;
  1167   uint16_t prPolicy;
  1168   uint32_t flags;
  1170   mLock.AssertCurrentThreadOwns();
  1172   if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
  1173     LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
  1174          (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
  1175     if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
  1176       return;
  1179   LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
  1181   switch (req->channel_type) {
  1182     case DATA_CHANNEL_RELIABLE:
  1183     case DATA_CHANNEL_RELIABLE_UNORDERED:
  1184       prPolicy = SCTP_PR_SCTP_NONE;
  1185       break;
  1186     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
  1187     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
  1188       prPolicy = SCTP_PR_SCTP_RTX;
  1189       break;
  1190     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
  1191     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
  1192       prPolicy = SCTP_PR_SCTP_TTL;
  1193       break;
  1194     default:
  1195       LOG(("Unknown channel type", req->channel_type));
  1196       /* XXX error handling */
  1197       return;
  1199   prValue = ntohl(req->reliability_param);
  1200   flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
  1202   if ((channel = FindChannelByStream(stream))) {
  1203     if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
  1204       LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
  1205            stream, channel->mState));
  1206      /* XXX: some error handling */
  1207     } else {
  1208       LOG(("Open for externally negotiated channel %u", stream));
  1209       // XXX should also check protocol, maybe label
  1210       if (prPolicy != channel->mPrPolicy ||
  1211           prValue != channel->mPrValue ||
  1212           flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
  1214         LOG(("WARNING: external negotiation mismatch with OpenRequest:"
  1215              "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
  1216              stream, prPolicy, channel->mPrPolicy,
  1217              prValue, channel->mPrValue, flags, channel->mFlags));
  1220     return;
  1222   if (stream >= mStreams.Length()) {
  1223     LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
  1224     return;
  1227   nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
  1228   nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
  1229                                            ntohs(req->protocol_length)));
  1231   channel = new DataChannel(this,
  1232                             stream,
  1233                             DataChannel::CONNECTING,
  1234                             label,
  1235                             protocol,
  1236                             prPolicy, prValue,
  1237                             flags,
  1238                             nullptr, nullptr);
  1239   mStreams[stream] = channel;
  1241   channel->mState = DataChannel::WAITING_TO_OPEN;
  1243   LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
  1244        channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
  1245   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1246                             DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
  1247                             this, channel));
  1249   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1251   if (!SendOpenAckMessage(stream)) {
  1252     // XXX Only on EAGAIN!?  And if not, then close the channel??
  1253     channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
  1254     StartDefer();
  1257   // Now process any queued data messages for the channel (which will
  1258   // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
  1259   // more that come in before that happens)
  1260   DeliverQueuedData(stream);
  1263 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
  1264 // That would make this code moot.  Keep it for now for backwards compatibility.
  1265 void
  1266 DataChannelConnection::DeliverQueuedData(uint16_t stream)
  1268   mLock.AssertCurrentThreadOwns();
  1270   uint32_t i = 0;
  1271   while (i < mQueuedData.Length()) {
  1272     // Careful! we may modify the array length from within the loop!
  1273     if (mQueuedData[i]->mStream == stream) {
  1274       LOG(("Delivering queued data for stream %u, length %u",
  1275            stream, mQueuedData[i]->mLength));
  1276       // Deliver the queued data
  1277       HandleDataMessage(mQueuedData[i]->mPpid,
  1278                         mQueuedData[i]->mData, mQueuedData[i]->mLength,
  1279                         mQueuedData[i]->mStream);
  1280       mQueuedData.RemoveElementAt(i);
  1281       continue; // don't bump index since we removed the element
  1283     i++;
  1287 void
  1288 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
  1289                                             size_t length, uint16_t stream)
  1291   DataChannel *channel;
  1293   mLock.AssertCurrentThreadOwns();
  1295   channel = FindChannelByStream(stream);
  1296   NS_ENSURE_TRUE_VOID(channel);
  1298   LOG(("OpenAck received for stream %u, waiting=%d", stream,
  1299        (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
  1301   channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
  1304 void
  1305 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
  1307   /* XXX: Send an error message? */
  1308   LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
  1309   // XXX Log to JS error console if possible
  1312 void
  1313 DataChannelConnection::HandleDataMessage(uint32_t ppid,
  1314                                          const void *data, size_t length,
  1315                                          uint16_t stream)
  1317   DataChannel *channel;
  1318   const char *buffer = (const char *) data;
  1320   mLock.AssertCurrentThreadOwns();
  1322   channel = FindChannelByStream(stream);
  1324   // XXX A closed channel may trip this... check
  1325   // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
  1326   // That would make this code moot.  Keep it for now for backwards compatibility.
  1327   if (!channel) {
  1328     // In the updated 0-RTT open case, the sender can send data immediately
  1329     // after Open, and doesn't set the in-order bit (since we don't have a
  1330     // response or ack).  Also, with external negotiation, data can come in
  1331     // before we're told about the external negotiation.  We need to buffer
  1332     // data until either a) Open comes in, if the ordering get messed up,
  1333     // or b) the app tells us this channel was externally negotiated.  When
  1334     // these occur, we deliver the data.
  1336     // Since this is rare and non-performance, keep a single list of queued
  1337     // data messages to deliver once the channel opens.
  1338     LOG(("Queuing data for stream %u, length %u", stream, length));
  1339     // Copies data
  1340     mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
  1341     return;
  1344   // XXX should this be a simple if, no warnings/debugbreaks?
  1345   NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
  1348     nsAutoCString recvData(buffer, length); // copies (<64) or allocates
  1349     bool is_binary = true;
  1351     if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
  1352         ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
  1353       is_binary = false;
  1355     if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
  1356       NS_WARNING("DataChannel message aborted by fragment type change!");
  1357       channel->mRecvBuffer.Truncate(0);
  1359     channel->mIsRecvBinary = is_binary;
  1361     switch (ppid) {
  1362       case DATA_CHANNEL_PPID_DOMSTRING:
  1363       case DATA_CHANNEL_PPID_BINARY:
  1364         channel->mRecvBuffer += recvData;
  1365         LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
  1366              is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
  1367              channel->mStream));
  1368         return; // Not ready to notify application
  1370       case DATA_CHANNEL_PPID_DOMSTRING_LAST:
  1371         LOG(("DataChannel: String message received of length %lu on channel %u",
  1372              length, channel->mStream));
  1373         if (!channel->mRecvBuffer.IsEmpty()) {
  1374           channel->mRecvBuffer += recvData;
  1375           LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
  1376           channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1377                                  DataChannelOnMessageAvailable::ON_DATA, this,
  1378                                  channel, channel->mRecvBuffer, -1));
  1379           channel->mRecvBuffer.Truncate(0);
  1380           return;
  1382         // else send using recvData normally
  1383         length = -1; // Flag for DOMString
  1385         // WebSockets checks IsUTF8() here; we can try to deliver it
  1386         break;
  1388       case DATA_CHANNEL_PPID_BINARY_LAST:
  1389         LOG(("DataChannel: Received binary message of length %lu on channel id %u",
  1390              length, channel->mStream));
  1391         if (!channel->mRecvBuffer.IsEmpty()) {
  1392           channel->mRecvBuffer += recvData;
  1393           LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
  1394           channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1395                                  DataChannelOnMessageAvailable::ON_DATA, this,
  1396                                  channel, channel->mRecvBuffer,
  1397                                  channel->mRecvBuffer.Length()));
  1398           channel->mRecvBuffer.Truncate(0);
  1399           return;
  1401         // else send using recvData normally
  1402         break;
  1404       default:
  1405         NS_ERROR("Unknown data PPID");
  1406         return;
  1408     /* Notify onmessage */
  1409     LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
  1410     channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1411                            DataChannelOnMessageAvailable::ON_DATA, this,
  1412                            channel, recvData, length));
  1416 // Called with mLock locked!
  1417 void
  1418 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
  1420   const struct rtcweb_datachannel_open_request *req;
  1421   const struct rtcweb_datachannel_ack *ack;
  1423   mLock.AssertCurrentThreadOwns();
  1425   switch (ppid) {
  1426     case DATA_CHANNEL_PPID_CONTROL:
  1427       req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
  1429       NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
  1430       switch (req->msg_type) {
  1431         case DATA_CHANNEL_OPEN_REQUEST:
  1432           // structure includes a possibly-unused char label[1] (in a packed structure)
  1433           NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
  1435           HandleOpenRequestMessage(req, length, stream);
  1436           break;
  1437         case DATA_CHANNEL_ACK:
  1438           // >= sizeof(*ack) checked above
  1440           ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
  1441           HandleOpenAckMessage(ack, length, stream);
  1442           break;
  1443         default:
  1444           HandleUnknownMessage(ppid, length, stream);
  1445           break;
  1447       break;
  1448     case DATA_CHANNEL_PPID_DOMSTRING:
  1449     case DATA_CHANNEL_PPID_DOMSTRING_LAST:
  1450     case DATA_CHANNEL_PPID_BINARY:
  1451     case DATA_CHANNEL_PPID_BINARY_LAST:
  1452       HandleDataMessage(ppid, buffer, length, stream);
  1453       break;
  1454     default:
  1455       LOG(("Message of length %lu, PPID %u on stream %u received.",
  1456            length, ppid, stream));
  1457       break;
  1461 void
  1462 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
  1464   uint32_t i, n;
  1466   switch (sac->sac_state) {
  1467   case SCTP_COMM_UP:
  1468     LOG(("Association change: SCTP_COMM_UP"));
  1469     if (mState == CONNECTING) {
  1470       mSocket = mMasterSocket;
  1471       mState = OPEN;
  1473       SetEvenOdd();
  1475       NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1476                                 DataChannelOnMessageAvailable::ON_CONNECTION,
  1477                                 this, true));
  1478       LOG(("DTLS connect() succeeded!  Entering connected mode"));
  1480       // Open any streams pending...
  1481       ProcessQueuedOpens();
  1483     } else if (mState == OPEN) {
  1484       LOG(("DataConnection Already OPEN"));
  1485     } else {
  1486       LOG(("Unexpected state: %d", mState));
  1488     break;
  1489   case SCTP_COMM_LOST:
  1490     LOG(("Association change: SCTP_COMM_LOST"));
  1491     // This association is toast, so also close all the channels -- from mainthread!
  1492     NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1493                               DataChannelOnMessageAvailable::ON_DISCONNECTED,
  1494                               this));
  1495     break;
  1496   case SCTP_RESTART:
  1497     LOG(("Association change: SCTP_RESTART"));
  1498     break;
  1499   case SCTP_SHUTDOWN_COMP:
  1500     LOG(("Association change: SCTP_SHUTDOWN_COMP"));
  1501     NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1502                               DataChannelOnMessageAvailable::ON_DISCONNECTED,
  1503                               this));
  1504     break;
  1505   case SCTP_CANT_STR_ASSOC:
  1506     LOG(("Association change: SCTP_CANT_STR_ASSOC"));
  1507     break;
  1508   default:
  1509     LOG(("Association change: UNKNOWN"));
  1510     break;
  1512   LOG(("Association change: streams (in/out) = (%u/%u)",
  1513        sac->sac_inbound_streams, sac->sac_outbound_streams));
  1515   NS_ENSURE_TRUE_VOID(sac);
  1516   n = sac->sac_length - sizeof(*sac);
  1517   if (((sac->sac_state == SCTP_COMM_UP) ||
  1518         (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
  1519     for (i = 0; i < n; ++i) {
  1520       switch (sac->sac_info[i]) {
  1521       case SCTP_ASSOC_SUPPORTS_PR:
  1522         LOG(("Supports: PR"));
  1523         break;
  1524       case SCTP_ASSOC_SUPPORTS_AUTH:
  1525         LOG(("Supports: AUTH"));
  1526         break;
  1527       case SCTP_ASSOC_SUPPORTS_ASCONF:
  1528         LOG(("Supports: ASCONF"));
  1529         break;
  1530       case SCTP_ASSOC_SUPPORTS_MULTIBUF:
  1531         LOG(("Supports: MULTIBUF"));
  1532         break;
  1533       case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
  1534         LOG(("Supports: RE-CONFIG"));
  1535         break;
  1536       default:
  1537         LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
  1538         break;
  1541   } else if (((sac->sac_state == SCTP_COMM_LOST) ||
  1542               (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
  1543     LOG(("Association: ABORT ="));
  1544     for (i = 0; i < n; ++i) {
  1545       LOG((" 0x%02x", sac->sac_info[i]));
  1548   if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
  1549       (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
  1550       (sac->sac_state == SCTP_COMM_LOST)) {
  1551     return;
  1555 void
  1556 DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
  1558   char addr_buf[INET6_ADDRSTRLEN];
  1559   const char *addr = "";
  1560   struct sockaddr_in *sin;
  1561   struct sockaddr_in6 *sin6;
  1562 #if defined(__Userspace_os_Windows)
  1563   DWORD addr_len = INET6_ADDRSTRLEN;
  1564 #endif
  1566   switch (spc->spc_aaddr.ss_family) {
  1567   case AF_INET:
  1568     sin = (struct sockaddr_in *)&spc->spc_aaddr;
  1569 #if !defined(__Userspace_os_Windows)
  1570     addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
  1571 #else
  1572     if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr,
  1573                             addr_buf, &addr_len)) {
  1574       return;
  1576 #endif
  1577     break;
  1578   case AF_INET6:
  1579     sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
  1580 #if !defined(__Userspace_os_Windows)
  1581     addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
  1582 #else
  1583     if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr,
  1584                             addr_buf, &addr_len)) {
  1585       return;
  1587 #endif
  1588   case AF_CONN:
  1589     addr = "DTLS connection";
  1590     break;
  1591   default:
  1592     break;
  1594   LOG(("Peer address %s is now ", addr));
  1595   switch (spc->spc_state) {
  1596   case SCTP_ADDR_AVAILABLE:
  1597     LOG(("SCTP_ADDR_AVAILABLE"));
  1598     break;
  1599   case SCTP_ADDR_UNREACHABLE:
  1600     LOG(("SCTP_ADDR_UNREACHABLE"));
  1601     break;
  1602   case SCTP_ADDR_REMOVED:
  1603     LOG(("SCTP_ADDR_REMOVED"));
  1604     break;
  1605   case SCTP_ADDR_ADDED:
  1606     LOG(("SCTP_ADDR_ADDED"));
  1607     break;
  1608   case SCTP_ADDR_MADE_PRIM:
  1609     LOG(("SCTP_ADDR_MADE_PRIM"));
  1610     break;
  1611   case SCTP_ADDR_CONFIRMED:
  1612     LOG(("SCTP_ADDR_CONFIRMED"));
  1613     break;
  1614   default:
  1615     LOG(("UNKNOWN"));
  1616     break;
  1618   LOG((" (error = 0x%08x).\n", spc->spc_error));
  1621 void
  1622 DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
  1624   size_t i, n;
  1626   n = sre->sre_length - sizeof(struct sctp_remote_error);
  1627   LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
  1628   for (i = 0; i < n; ++i) {
  1629     LOG((" 0x%02x", sre-> sre_data[i]));
  1633 void
  1634 DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
  1636   LOG(("Shutdown event."));
  1637   /* XXX: notify all channels. */
  1638   // Attempts to actually send anything will fail
  1641 void
  1642 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
  1644   LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
  1647 void
  1648 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
  1650   size_t i, n;
  1652   if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
  1653     LOG(("Unsent "));
  1655    if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
  1656     LOG(("Sent "));
  1658   if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
  1659     LOG(("(flags = %x) ", ssfe->ssfe_flags));
  1661   LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
  1662        ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
  1663        ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
  1664   n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
  1665   for (i = 0; i < n; ++i) {
  1666     LOG((" 0x%02x", ssfe->ssfe_data[i]));
  1670 void
  1671 DataChannelConnection::ClearResets()
  1673   // Clear all pending resets
  1674   if (!mStreamsResetting.IsEmpty()) {
  1675     LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
  1678   for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
  1679     nsRefPtr<DataChannel> channel;
  1680     channel = FindChannelByStream(mStreamsResetting[i]);
  1681     if (channel) {
  1682       LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
  1683       mStreams[channel->mStream] = nullptr;
  1686   mStreamsResetting.Clear();
  1689 void
  1690 DataChannelConnection::ResetOutgoingStream(uint16_t stream)
  1692   uint32_t i;
  1694   mLock.AssertCurrentThreadOwns();
  1695   LOG(("Connection %p: Resetting outgoing stream %u",
  1696        (void *) this, stream));
  1697   // Rarely has more than a couple items and only for a short time
  1698   for (i = 0; i < mStreamsResetting.Length(); ++i) {
  1699     if (mStreamsResetting[i] == stream) {
  1700       return;
  1703   mStreamsResetting.AppendElement(stream);
  1706 void
  1707 DataChannelConnection::SendOutgoingStreamReset()
  1709   struct sctp_reset_streams *srs;
  1710   uint32_t i;
  1711   size_t len;
  1713   LOG(("Connection %p: Sending outgoing stream reset for %d streams",
  1714        (void *) this, mStreamsResetting.Length()));
  1715   mLock.AssertCurrentThreadOwns();
  1716   if (mStreamsResetting.IsEmpty()) {
  1717     LOG(("No streams to reset"));
  1718     return;
  1720   len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
  1721   srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
  1722   memset(srs, 0, len);
  1723   srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
  1724   srs->srs_number_streams = mStreamsResetting.Length();
  1725   for (i = 0; i < mStreamsResetting.Length(); ++i) {
  1726     srs->srs_stream_list[i] = mStreamsResetting[i];
  1728   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
  1729     LOG(("***failed: setsockopt RESET, errno %d", errno));
  1730     // if errno == EALREADY, this is normal - we can't send another reset
  1731     // with one pending.
  1732     // When we get an incoming reset (which may be a response to our
  1733     // outstanding one), see if we have any pending outgoing resets and
  1734     // send them
  1735   } else {
  1736     mStreamsResetting.Clear();
  1738   moz_free(srs);
  1741 void
  1742 DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
  1744   uint32_t n, i;
  1745   nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel
  1747   if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
  1748       !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
  1749     n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
  1750     for (i = 0; i < n; ++i) {
  1751       if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  1752         channel = FindChannelByStream(strrst->strreset_stream_list[i]);
  1753         if (channel) {
  1754           // The other side closed the channel
  1755           // We could be in three states:
  1756           // 1. Normal state (input and output streams (OPEN)
  1757           //    Notify application, send a RESET in response on our
  1758           //    outbound channel.  Go to CLOSED
  1759           // 2. We sent our own reset (CLOSING); either they crossed on the
  1760           //    wire, or this is a response to our Reset.
  1761           //    Go to CLOSED
  1762           // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
  1763           //    I believe this is impossible, as we don't have an input stream yet.
  1765           LOG(("Incoming: Channel %u  closed, state %d",
  1766                channel->mStream, channel->mState));
  1767           ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
  1768                         channel->mState == DataChannel::CLOSING ||
  1769                         channel->mState == DataChannel::CONNECTING ||
  1770                         channel->mState == DataChannel::WAITING_TO_OPEN);
  1771           if (channel->mState == DataChannel::OPEN ||
  1772               channel->mState == DataChannel::WAITING_TO_OPEN) {
  1773             ResetOutgoingStream(channel->mStream);
  1774             SendOutgoingStreamReset();
  1775             NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1776                                       DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1777                                       channel));
  1779           mStreams[channel->mStream] = nullptr;
  1781           LOG(("Disconnected DataChannel %p from connection %p",
  1782                (void *) channel.get(), (void *) channel->mConnection.get()));
  1783           channel->Destroy();
  1784           // At this point when we leave here, the object is a zombie held alive only by the DOM object
  1785         } else {
  1786           LOG(("Can't find incoming channel %d",i));
  1792   // In case we failed to send a RESET due to having one outstanding, process any pending resets now:
  1793   if (!mStreamsResetting.IsEmpty()) {
  1794     LOG(("Sending %d pending resets", mStreamsResetting.Length()));
  1795     SendOutgoingStreamReset();
  1799 void
  1800 DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
  1802   uint16_t stream;
  1803   uint32_t i;
  1804   nsRefPtr<DataChannel> channel;
  1806   if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
  1807     LOG(("*** Failed increasing number of streams from %u (%u/%u)",
  1808          mStreams.Length(),
  1809          strchg->strchange_instrms,
  1810          strchg->strchange_outstrms));
  1811     // XXX FIX! notify pending opens of failure
  1812     return;
  1813   } else {
  1814     if (strchg->strchange_instrms > mStreams.Length()) {
  1815       LOG(("Other side increased streams from %u to %u",
  1816            mStreams.Length(), strchg->strchange_instrms));
  1818     if (strchg->strchange_outstrms > mStreams.Length() ||
  1819         strchg->strchange_instrms > mStreams.Length()) {
  1820       uint16_t old_len = mStreams.Length();
  1821       uint16_t new_len = std::max(strchg->strchange_outstrms,
  1822                                   strchg->strchange_instrms);
  1823       LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
  1824            old_len, new_len, new_len - old_len,
  1825            strchg->strchange_instrms));
  1826       // make sure both are the same length
  1827       mStreams.AppendElements(new_len - old_len);
  1828       LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
  1829       for (size_t i = old_len; i < mStreams.Length(); ++i) {
  1830         mStreams[i] = nullptr;
  1832       // Re-process any channels waiting for streams.
  1833       // Linear search, but we don't increase channels often and
  1834       // the array would only get long in case of an app error normally
  1836       // Make sure we request enough streams if there's a big jump in streams
  1837       // Could make a more complex API for OpenXxxFinish() and avoid this loop
  1838       int32_t num_needed = mPending.GetSize();
  1839       LOG(("%d of %d new streams already needed", num_needed,
  1840            new_len - old_len));
  1841       num_needed -= (new_len - old_len); // number we added
  1842       if (num_needed > 0) {
  1843         if (num_needed < 16)
  1844           num_needed = 16;
  1845         LOG(("Not enough new streams, asking for %d more", num_needed));
  1846         RequestMoreStreams(num_needed);
  1847       } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
  1848         LOG(("Requesting %d output streams to match partner",
  1849              strchg->strchange_instrms - strchg->strchange_outstrms));
  1850         RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
  1853       ProcessQueuedOpens();
  1855     // else probably not a change in # of streams
  1858   for (i = 0; i < mStreams.Length(); ++i) {
  1859     channel = mStreams[i];
  1860     if (!channel)
  1861       continue;
  1863     if ((channel->mState == CONNECTING) &&
  1864         (channel->mStream == INVALID_STREAM)) {
  1865       if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
  1866           (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
  1867         /* XXX: Signal to the other end. */
  1868         channel->mState = CLOSED;
  1869         NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1870                                   DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1871                                   channel));
  1872         // maybe fire onError (bug 843625)
  1873       } else {
  1874         stream = FindFreeStream();
  1875         if (stream != INVALID_STREAM) {
  1876           channel->mStream = stream;
  1877           mStreams[stream] = channel;
  1878           channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
  1879           /// XXX fix
  1880           StartDefer();
  1881         } else {
  1882           /* We will not find more ... */
  1883           break;
  1891 // Called with mLock locked!
  1892 void
  1893 DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
  1895   mLock.AssertCurrentThreadOwns();
  1896   if (notif->sn_header.sn_length != (uint32_t)n) {
  1897     return;
  1899   switch (notif->sn_header.sn_type) {
  1900   case SCTP_ASSOC_CHANGE:
  1901     HandleAssociationChangeEvent(&(notif->sn_assoc_change));
  1902     break;
  1903   case SCTP_PEER_ADDR_CHANGE:
  1904     HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
  1905     break;
  1906   case SCTP_REMOTE_ERROR:
  1907     HandleRemoteErrorEvent(&(notif->sn_remote_error));
  1908     break;
  1909   case SCTP_SHUTDOWN_EVENT:
  1910     HandleShutdownEvent(&(notif->sn_shutdown_event));
  1911     break;
  1912   case SCTP_ADAPTATION_INDICATION:
  1913     HandleAdaptationIndication(&(notif->sn_adaptation_event));
  1914     break;
  1915   case SCTP_PARTIAL_DELIVERY_EVENT:
  1916     LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
  1917     break;
  1918   case SCTP_AUTHENTICATION_EVENT:
  1919     LOG(("SCTP_AUTHENTICATION_EVENT"));
  1920     break;
  1921   case SCTP_SENDER_DRY_EVENT:
  1922     //LOG(("SCTP_SENDER_DRY_EVENT"));
  1923     break;
  1924   case SCTP_NOTIFICATIONS_STOPPED_EVENT:
  1925     LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
  1926     break;
  1927   case SCTP_SEND_FAILED_EVENT:
  1928     HandleSendFailedEvent(&(notif->sn_send_failed_event));
  1929     break;
  1930   case SCTP_STREAM_RESET_EVENT:
  1931     HandleStreamResetEvent(&(notif->sn_strreset_event));
  1932     break;
  1933   case SCTP_ASSOC_RESET_EVENT:
  1934     LOG(("SCTP_ASSOC_RESET_EVENT"));
  1935     break;
  1936   case SCTP_STREAM_CHANGE_EVENT:
  1937     HandleStreamChangeEvent(&(notif->sn_strchange_event));
  1938     break;
  1939   default:
  1940     LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
  1941     break;
  1945 int
  1946 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
  1947                                        struct sctp_rcvinfo rcv, int32_t flags)
  1949   ASSERT_WEBRTC(!NS_IsMainThread());
  1951   if (!data) {
  1952     usrsctp_close(sock); // SCTP has finished shutting down
  1953   } else {
  1954     MutexAutoLock lock(mLock);
  1955     if (flags & MSG_NOTIFICATION) {
  1956       HandleNotification(static_cast<union sctp_notification *>(data), datalen);
  1957     } else {
  1958       HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
  1961   // sctp allocates 'data' with malloc(), and expects the receiver to free
  1962   // it (presumably with free).
  1963   // XXX future optimization: try to deliver messages without an internal
  1964   // alloc/copy, and if so delay the free until later.
  1965   free(data);
  1966   // usrsctp defines the callback as returning an int, but doesn't use it
  1967   return 1;
  1970 already_AddRefed<DataChannel>
  1971 DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
  1972                             Type type, bool inOrder,
  1973                             uint32_t prValue, DataChannelListener *aListener,
  1974                             nsISupports *aContext, bool aExternalNegotiated,
  1975                             uint16_t aStream)
  1977   // aStream == INVALID_STREAM to have the protocol allocate
  1978   uint16_t prPolicy = SCTP_PR_SCTP_NONE;
  1979   uint32_t flags;
  1981   LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
  1982        PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
  1983        type, inOrder, prValue, aListener, aContext,
  1984        aExternalNegotiated ? "true" : "false", aStream));
  1985   switch (type) {
  1986     case DATA_CHANNEL_RELIABLE:
  1987       prPolicy = SCTP_PR_SCTP_NONE;
  1988       break;
  1989     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
  1990       prPolicy = SCTP_PR_SCTP_RTX;
  1991       break;
  1992     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
  1993       prPolicy = SCTP_PR_SCTP_TTL;
  1994       break;
  1996   if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
  1997     return nullptr;
  2000   // Don't look past currently-negotiated streams
  2001   if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
  2002     LOG(("ERROR: external negotiation of already-open channel %u", aStream));
  2003     // XXX How do we indicate this up to the application?  Probably the
  2004     // caller's job, but we may need to return an error code.
  2005     return nullptr;
  2008   flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
  2009   nsRefPtr<DataChannel> channel(new DataChannel(this,
  2010                                                 aStream,
  2011                                                 DataChannel::CONNECTING,
  2012                                                 label, protocol,
  2013                                                 type, prValue,
  2014                                                 flags,
  2015                                                 aListener, aContext));
  2016   if (aExternalNegotiated) {
  2017     channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
  2020   MutexAutoLock lock(mLock); // OpenFinish assumes this
  2021   return OpenFinish(channel.forget());
  2024 // Separate routine so we can also call it to finish up from pending opens
  2025 already_AddRefed<DataChannel>
  2026 DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
  2028   nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in
  2029   // Normally 1 reference if called from ::Open(), or 2 if called from
  2030   // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
  2031   uint16_t stream = channel->mStream;
  2032   bool queue = false;
  2034   mLock.AssertCurrentThreadOwns();
  2036   // Cases we care about:
  2037   // Pre-negotiated:
  2038   //    Not Open:
  2039   //      Doesn't fit:
  2040   //         -> change initial ask or renegotiate after open
  2041   //      -> queue open
  2042   //    Open:
  2043   //      Doesn't fit:
  2044   //         -> RequestMoreStreams && queue
  2045   //      Does fit:
  2046   //         -> open
  2047   // Not negotiated:
  2048   //    Not Open:
  2049   //      -> queue open
  2050   //    Open:
  2051   //      -> Try to get a stream
  2052   //      Doesn't fit:
  2053   //         -> RequestMoreStreams && queue
  2054   //      Does fit:
  2055   //         -> open
  2056   // So the Open cases are basically the same
  2057   // Not Open cases are simply queue for non-negotiated, and
  2058   // either change the initial ask or possibly renegotiate after open.
  2060   if (mState == OPEN) {
  2061     if (stream == INVALID_STREAM) {
  2062       stream = FindFreeStream(); // may be INVALID_STREAM if we need more
  2064     if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
  2065       // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
  2066       // to avoid going back immediately for more if the ask to N, N+1, etc
  2067       int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
  2068                             (stream-((int32_t)mStreams.Length())) + 16;
  2069       if (!RequestMoreStreams(more_needed)) {
  2070         // Something bad happened... we're done
  2071         goto request_error_cleanup;
  2073       queue = true;
  2075   } else {
  2076     // not OPEN
  2077     if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
  2078         mState == CLOSED) {
  2079       // Update number of streams for init message
  2080       struct sctp_initmsg initmsg;
  2081       socklen_t len = sizeof(initmsg);
  2082       int32_t total_needed = stream+16;
  2084       memset(&initmsg, 0, sizeof(initmsg));
  2085       if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
  2086         LOG(("*** failed getsockopt SCTP_INITMSG"));
  2087         goto request_error_cleanup;
  2089       LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
  2090            initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
  2091       initmsg.sinit_num_ostreams  = total_needed;
  2092       initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
  2093       if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
  2094                              (socklen_t)sizeof(initmsg)) < 0) {
  2095         LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
  2096         goto request_error_cleanup;
  2099       int32_t old_len = mStreams.Length();
  2100       mStreams.AppendElements(total_needed - old_len);
  2101       for (int32_t i = old_len; i < total_needed; ++i) {
  2102         mStreams[i] = nullptr;
  2105     // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
  2106     // is called, if needed
  2107     queue = true;
  2109   if (queue) {
  2110     LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
  2111     // Also serves to mark we told the app
  2112     channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
  2113     channel->AddRef(); // we need a ref for the nsDeQue and one to return
  2114     mPending.Push(channel);
  2115     return channel.forget();
  2118   MOZ_ASSERT(stream != INVALID_STREAM);
  2119   // just allocated (& OPEN), or externally negotiated
  2120   mStreams[stream] = channel; // holds a reference
  2121   channel->mStream = stream;
  2123 #ifdef TEST_QUEUED_DATA
  2124   // It's painful to write a test for this...
  2125   channel->mState = OPEN;
  2126   channel->mReady = true;
  2127   SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
  2128 #endif
  2130   if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
  2131     // Don't send unordered until this gets cleared
  2132     channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
  2135   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
  2136     if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
  2137                                 stream,
  2138                                 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
  2139                                 channel->mPrPolicy, channel->mPrValue)) {
  2140       LOG(("SendOpenRequest failed, errno = %d", errno));
  2141       if (errno == EAGAIN || errno == EWOULDBLOCK) {
  2142         channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
  2143         StartDefer();
  2145         return channel.forget();
  2146       } else {
  2147         if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  2148           // We already returned the channel to the app.
  2149           NS_ERROR("Failed to send open request");
  2150           NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  2151                                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  2152                                     channel));
  2154         // If we haven't returned the channel yet, it will get destroyed when we exit
  2155         // this function.
  2156         mStreams[stream] = nullptr;
  2157         channel->mStream = INVALID_STREAM;
  2158         // we'll be destroying the channel
  2159         channel->mState = CLOSED;
  2160         return nullptr;
  2162       /* NOTREACHED */
  2165   // Either externally negotiated or we sent Open
  2166   channel->mState = OPEN;
  2167   channel->mReady = true;
  2168   // FIX?  Move into DOMDataChannel?  I don't think we can send it yet here
  2169   LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  2170   NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  2171                             DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
  2172                             channel));
  2174   return channel.forget();
  2176 request_error_cleanup:
  2177   channel->mState = CLOSED;
  2178   if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  2179     // We already returned the channel to the app.
  2180     NS_ERROR("Failed to request more streams");
  2181     NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  2182                               DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  2183                               channel));
  2184     return channel.forget();
  2186   // we'll be destroying the channel, but it never really got set up
  2187   // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
  2188   // Dispatch it to ourselves
  2189   return nullptr;
  2192 int32_t
  2193 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
  2194                                        uint32_t length, uint32_t ppid)
  2196   uint16_t flags;
  2197   struct sctp_sendv_spa spa;
  2198   int32_t result;
  2200   NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
  2201   NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
  2203   // To avoid problems where an in-order OPEN is lost and an
  2204   // out-of-order data message "beats" it, require data to be in-order
  2205   // until we get an ACK.
  2206   if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
  2207       !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
  2208     flags = SCTP_UNORDERED;
  2209   } else {
  2210     flags = 0;
  2213   spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  2214   spa.sendv_sndinfo.snd_sid = channel->mStream;
  2215   spa.sendv_sndinfo.snd_flags = flags;
  2216   spa.sendv_sndinfo.snd_context = 0;
  2217   spa.sendv_sndinfo.snd_assoc_id = 0;
  2218   spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
  2220   if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
  2221     spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
  2222     spa.sendv_prinfo.pr_value = channel->mPrValue;
  2223     spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  2226   // Note: Main-thread IO, but doesn't block!
  2227   // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
  2228   // (more than the buffersize) queue data onto another thread to do the
  2229   // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
  2231   // SCTP will return EMSGSIZE if the message is bigger than the buffer
  2232   // size (or EAGAIN if there isn't space)
  2233   if (channel->mBufferedData.IsEmpty()) {
  2234     result = usrsctp_sendv(mSocket, data, length,
  2235                            nullptr, 0,
  2236                            (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
  2237                            SCTP_SENDV_SPA, 0);
  2238     LOG(("Sent buffer (len=%u), result=%d", length, result));
  2239   } else {
  2240     // Fake EAGAIN if we're already buffering data
  2241     result = -1;
  2242     errno = EAGAIN;
  2244   if (result < 0) {
  2245     if (errno == EAGAIN || errno == EWOULDBLOCK) {
  2246       // queue data for resend!  And queue any further data for the stream until it is...
  2247       BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc
  2248       channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
  2249       channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
  2250       LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
  2251       StartDefer();
  2252       return 0;
  2254     LOG(("error %d sending string", errno));
  2256   return result;
  2259 // Handles fragmenting binary messages
  2260 int32_t
  2261 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
  2262                                   uint32_t len,
  2263                                   uint32_t ppid_partial, uint32_t ppid_final)
  2265   // Since there's a limit on network buffer size and no limits on message
  2266   // size, and we don't want to use EOR mode (multiple writes for a
  2267   // message, but all other streams are blocked until you finish sending
  2268   // this message), we need to add application-level fragmentation of large
  2269   // messages.  On a reliable channel, these can be simply rebuilt into a
  2270   // large message.  On an unreliable channel, we can't and don't know how
  2271   // long to wait, and there are no retransmissions, and no easy way to
  2272   // tell the user "this part is missing", so on unreliable channels we
  2273   // need to return an error if sending more bytes than the network buffers
  2274   // can hold, and perhaps a lower number.
  2276   // We *really* don't want to do this from main thread! - and SendMsgInternal
  2277   // avoids blocking.
  2278   // This MUST be reliable and in-order for the reassembly to work
  2279   if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
  2280       channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
  2281       !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
  2282     int32_t sent=0;
  2283     uint32_t origlen = len;
  2284     LOG(("Sending binary message length %u in chunks", len));
  2285     // XXX check flags for out-of-order, or force in-order for large binary messages
  2286     while (len > 0) {
  2287       uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
  2288       uint32_t ppid;
  2289       len -= sendlen;
  2290       ppid = len > 0 ? ppid_partial : ppid_final;
  2291       LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
  2292       // Note that these might end up being deferred and queued.
  2293       sent += SendMsgInternal(channel, data, sendlen, ppid);
  2294       data += sendlen;
  2296     LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
  2297          (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
  2298          origlen, sent,
  2299          channel->mBufferedData.Length()));
  2300     return sent;
  2302   NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
  2303                    "Sending too-large data on unreliable channel!");
  2305   // This will fail if the message is too large (default 256K)
  2306   return SendMsgInternal(channel, data, len, ppid_final);
  2309 class ReadBlobRunnable : public nsRunnable {
  2310 public:
  2311   ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
  2312     nsIInputStream* aBlob) :
  2313     mConnection(aConnection),
  2314     mStream(aStream),
  2315     mBlob(aBlob)
  2316   { }
  2318   NS_IMETHODIMP Run() {
  2319     // ReadBlob() is responsible to releasing the reference
  2320     DataChannelConnection *self = mConnection;
  2321     self->ReadBlob(mConnection.forget(), mStream, mBlob);
  2322     return NS_OK;
  2325 private:
  2326   // Make sure the Connection doesn't die while there are jobs outstanding.
  2327   // Let it die (if released by PeerConnectionImpl while we're running)
  2328   // when we send our runnable back to MainThread.  Then ~DataChannelConnection
  2329   // can send the IOThread to MainThread to die in a runnable, avoiding
  2330   // unsafe event loop recursion.  Evil.
  2331   nsRefPtr<DataChannelConnection> mConnection;
  2332   uint16_t mStream;
  2333   // Use RefCount for preventing the object is deleted when SendBlob returns.
  2334   nsRefPtr<nsIInputStream> mBlob;
  2335 };
  2337 int32_t
  2338 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
  2340   DataChannel *channel = mStreams[stream];
  2341   NS_ENSURE_TRUE(channel, 0);
  2342   // Spawn a thread to send the data
  2343   if (!mInternalIOThread) {
  2344     nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
  2345     if (NS_FAILED(res)) {
  2346       return -1;
  2350   nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob);
  2351   mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL);
  2352   return 0;
  2355 void
  2356 DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
  2357                                 uint16_t aStream, nsIInputStream* aBlob)
  2359   // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
  2360   // it off mainthread; if PeerConnectionImpl has released then we want
  2361   // ~DataChannelConnection() to run on MainThread
  2363   // XXX to do this safely, we must enqueue these atomically onto the
  2364   // output socket.  We need a sender thread(s?) to enque data into the
  2365   // socket and to avoid main-thread IO that might block.  Even on a
  2366   // background thread, we may not want to block on one stream's data.
  2367   // I.e. run non-blocking and service multiple channels.
  2369   // For now as a hack, send as a single blast of queued packets which may
  2370   // be deferred until buffer space is available.
  2371   nsCString temp;
  2372   uint64_t len;
  2373   nsCOMPtr<nsIThread> mainThread;
  2374   NS_GetMainThread(getter_AddRefs(mainThread));
  2376   if (NS_FAILED(aBlob->Available(&len)) ||
  2377       NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) {
  2378     // Bug 966602:  Doesn't return an error to the caller via onerror.
  2379     // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
  2380     NS_ProxyRelease(mainThread, aThis.take());
  2381     return;
  2383   aBlob->Close();
  2384   RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis),
  2385                                &DataChannelConnection::SendBinaryMsg,
  2386                                aStream, temp),
  2387                 NS_DISPATCH_NORMAL);
  2390 void
  2391 DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
  2393   ASSERT_WEBRTC(NS_IsMainThread());
  2394   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  2395     if (mStreams[i]) {
  2396       aStreamList->push_back(mStreams[i]->mStream);
  2401 int32_t
  2402 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
  2403                                      bool isBinary)
  2405   ASSERT_WEBRTC(NS_IsMainThread());
  2406   // We really could allow this from other threads, so long as we deal with
  2407   // asynchronosity issues with channels closing, in particular access to
  2408   // mStreams, and issues with the association closing (access to mSocket).
  2410   const char *data = aMsg.BeginReading();
  2411   uint32_t len     = aMsg.Length();
  2412   DataChannel *channel;
  2414   LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
  2415   // XXX if we want more efficiency, translate flags once at open time
  2416   channel = mStreams[stream];
  2417   NS_ENSURE_TRUE(channel, 0);
  2419   if (isBinary)
  2420     return SendBinary(channel, data, len,
  2421                       DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
  2422   return SendBinary(channel, data, len,
  2423                     DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
  2426 void
  2427 DataChannelConnection::Close(DataChannel *aChannel)
  2429   MutexAutoLock lock(mLock);
  2430   CloseInt(aChannel);
  2433 // So we can call Close() with the lock already held
  2434 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
  2435 void
  2436 DataChannelConnection::CloseInt(DataChannel *aChannel)
  2438   MOZ_ASSERT(aChannel);
  2439   nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
  2441   mLock.AssertCurrentThreadOwns();
  2442   LOG(("Connection %p/Channel %p: Closing stream %u",
  2443        channel->mConnection.get(), channel.get(), channel->mStream));
  2444   // re-test since it may have closed before the lock was grabbed
  2445   if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
  2446     LOG(("Channel already closing/closed (%u)", aChannel->mState));
  2447     if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
  2448       // called from CloseAll()
  2449       // we're not going to hang around waiting any more
  2450       mStreams[channel->mStream] = nullptr;
  2452     return;
  2454   aChannel->mBufferedData.Clear();
  2455   if (channel->mStream != INVALID_STREAM) {
  2456     ResetOutgoingStream(channel->mStream);
  2457     if (mState == CLOSED) { // called from CloseAll()
  2458       // Let resets accumulate then send all at once in CloseAll()
  2459       // we're not going to hang around waiting
  2460       mStreams[channel->mStream] = nullptr;
  2461     } else {
  2462       SendOutgoingStreamReset();
  2465   aChannel->mState = CLOSING;
  2466   if (mState == CLOSED) {
  2467     // we're not going to hang around waiting
  2468     channel->Destroy();
  2470   // At this point when we leave here, the object is a zombie held alive only by the DOM object
  2473 void DataChannelConnection::CloseAll()
  2475   LOG(("Closing all channels (connection %p)", (void*) this));
  2476   // Don't need to lock here
  2478   // Make sure no more channels will be opened
  2480     MutexAutoLock lock(mLock);
  2481     mState = CLOSED;
  2484   // Close current channels
  2485   // If there are runnables, they hold a strong ref and keep the channel
  2486   // and/or connection alive (even if in a CLOSED state)
  2487   bool closed_some = false;
  2488   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  2489     if (mStreams[i]) {
  2490       mStreams[i]->Close();
  2491       closed_some = true;
  2495   // Clean up any pending opens for channels
  2496   nsRefPtr<DataChannel> channel;
  2497   while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
  2498     LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
  2499     channel->Close(); // also releases the ref on each iteration
  2500     closed_some = true;
  2502   // It's more efficient to let the Resets queue in shutdown and then
  2503   // SendOutgoingStreamReset() here.
  2504   if (closed_some) {
  2505     MutexAutoLock lock(mLock);
  2506     SendOutgoingStreamReset();
  2510 DataChannel::~DataChannel()
  2512   // NS_ASSERTION since this is more "I think I caught all the cases that
  2513   // can cause this" than a true kill-the-program assertion.  If this is
  2514   // wrong, nothing bad happens.  A worst it's a leak.
  2515   NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
  2518 void
  2519 DataChannel::Close()
  2521   ENSURE_DATACONNECTION;
  2522   mConnection->Close(this);
  2525 // Used when disconnecting from the DataChannelConnection
  2526 void
  2527 DataChannel::Destroy()
  2529   ENSURE_DATACONNECTION;
  2531   LOG(("Destroying Data channel %u", mStream));
  2532   MOZ_ASSERT_IF(mStream != INVALID_STREAM,
  2533                 !mConnection->FindChannelByStream(mStream));
  2534   mStream = INVALID_STREAM;
  2535   mState = CLOSED;
  2536   mConnection = nullptr;
  2539 void
  2540 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
  2542   MutexAutoLock mLock(mListenerLock);
  2543   mContext = aContext;
  2544   mListener = aListener;
  2547 // May be called from another (i.e. Main) thread!
  2548 void
  2549 DataChannel::AppReady()
  2551   ENSURE_DATACONNECTION;
  2553   MutexAutoLock lock(mConnection->mLock);
  2555   mReady = true;
  2556   if (mState == WAITING_TO_OPEN) {
  2557     mState = OPEN;
  2558     NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  2559                               DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
  2560                               this));
  2561     for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
  2562       nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
  2563       MOZ_ASSERT(runnable);
  2564       NS_DispatchToMainThread(runnable);
  2566   } else {
  2567     NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
  2569   mQueuedMessages.Clear();
  2570   mQueuedMessages.Compact();
  2571   // We never use it again...  We could even allocate the array in the odd
  2572   // cases we need it.
  2575 uint32_t
  2576 DataChannel::GetBufferedAmount()
  2578   uint32_t buffered = 0;
  2579   for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
  2580     buffered += mBufferedData[i]->mLength;
  2582   return buffered;
  2585 // Called with mLock locked!
  2586 void
  2587 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
  2589   if (!mReady &&
  2590       (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
  2591     mQueuedMessages.AppendElement(aMessage);
  2592   } else {
  2593     NS_DispatchToMainThread(aMessage);
  2597 } // namespace mozilla

mercurial