netwerk/sctp/datachannel/DataChannel.cpp

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/netwerk/sctp/datachannel/DataChannel.cpp	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,2597 @@
     1.4 +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     1.5 +/* vim: set ts=2 et sw=2 tw=80: */
     1.6 +/* This Source Code Form is subject to the terms of the Mozilla Public
     1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this file,
     1.8 + * You can obtain one at http://mozilla.org/MPL/2.0/. */
     1.9 +
    1.10 +#include <stdio.h>
    1.11 +#include <stdlib.h>
    1.12 +#if !defined(__Userspace_os_Windows)
    1.13 +#include <arpa/inet.h>
    1.14 +#endif
    1.15 +// usrsctp.h expects to have errno definitions prior to its inclusion.
    1.16 +#include <errno.h>
    1.17 +
    1.18 +#define SCTP_DEBUG 1
    1.19 +#define SCTP_STDINT_INCLUDE <stdint.h>
    1.20 +
    1.21 +#ifdef _MSC_VER
    1.22 +// Disable "warning C4200: nonstandard extension used : zero-sized array in
    1.23 +//          struct/union"
    1.24 +// ...which the third-party file usrsctp.h runs afoul of.
    1.25 +#pragma warning(push)
    1.26 +#pragma warning(disable:4200)
    1.27 +#endif
    1.28 +
    1.29 +#include "usrsctp.h"
    1.30 +
    1.31 +#ifdef _MSC_VER
    1.32 +#pragma warning(pop)
    1.33 +#endif
    1.34 +
    1.35 +#include "DataChannelLog.h"
    1.36 +
    1.37 +#include "nsServiceManagerUtils.h"
    1.38 +#include "nsIObserverService.h"
    1.39 +#include "nsIObserver.h"
    1.40 +#include "mozilla/Services.h"
    1.41 +#include "nsProxyRelease.h"
    1.42 +#include "nsThread.h"
    1.43 +#include "nsThreadUtils.h"
    1.44 +#include "nsAutoPtr.h"
    1.45 +#include "nsNetUtil.h"
    1.46 +#include "mozilla/StaticPtr.h"
    1.47 +#ifdef MOZ_PEERCONNECTION
    1.48 +#include "mtransport/runnable_utils.h"
    1.49 +#endif
    1.50 +
    1.51 +#define DATACHANNEL_LOG(args) LOG(args)
    1.52 +#include "DataChannel.h"
    1.53 +#include "DataChannelProtocol.h"
    1.54 +
    1.55 +#ifdef PR_LOGGING
    1.56 +PRLogModuleInfo*
    1.57 +GetDataChannelLog()
    1.58 +{
    1.59 +  static PRLogModuleInfo* sLog;
    1.60 +  if (!sLog)
    1.61 +    sLog = PR_NewLogModule("DataChannel");
    1.62 +  return sLog;
    1.63 +}
    1.64 +
    1.65 +PRLogModuleInfo*
    1.66 +GetSCTPLog()
    1.67 +{
    1.68 +  static PRLogModuleInfo* sLog;
    1.69 +  if (!sLog)
    1.70 +    sLog = PR_NewLogModule("SCTP");
    1.71 +  return sLog;
    1.72 +}
    1.73 +#endif
    1.74 +
    1.75 +// Let us turn on and off important assertions in non-debug builds
    1.76 +#ifdef DEBUG
    1.77 +#define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
    1.78 +#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
    1.79 +#define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
    1.80 +#endif
    1.81 +
    1.82 +static bool sctp_initialized;
    1.83 +
    1.84 +namespace mozilla {
    1.85 +
    1.86 +class DataChannelShutdown;
    1.87 +StaticRefPtr<DataChannelShutdown> gDataChannelShutdown;
    1.88 +
    1.89 +class DataChannelShutdown : public nsIObserver
    1.90 +{
    1.91 +public:
    1.92 +  // This needs to be tied to some form object that is guaranteed to be
    1.93 +  // around (singleton likely) unless we want to shutdown sctp whenever
    1.94 +  // we're not using it (and in which case we'd keep a refcnt'd object
    1.95 +  // ref'd by each DataChannelConnection to release the SCTP usrlib via
    1.96 +  // sctp_finish)
    1.97 +
    1.98 +  NS_DECL_ISUPPORTS
    1.99 +
   1.100 +  DataChannelShutdown() {}
   1.101 +
   1.102 +  void Init()
   1.103 +    {
   1.104 +      nsCOMPtr<nsIObserverService> observerService =
   1.105 +        mozilla::services::GetObserverService();
   1.106 +      if (!observerService)
   1.107 +        return;
   1.108 +
   1.109 +      nsresult rv = observerService->AddObserver(this,
   1.110 +                                                 "profile-change-net-teardown",
   1.111 +                                                 false);
   1.112 +      MOZ_ASSERT(rv == NS_OK);
   1.113 +      (void) rv;
   1.114 +    }
   1.115 +
   1.116 +  virtual ~DataChannelShutdown()
   1.117 +    {
   1.118 +      nsCOMPtr<nsIObserverService> observerService =
   1.119 +        mozilla::services::GetObserverService();
   1.120 +      if (observerService)
   1.121 +        observerService->RemoveObserver(this, "profile-change-net-teardown");
   1.122 +    }
   1.123 +
   1.124 +  NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic,
   1.125 +                        const char16_t* aData) {
   1.126 +    if (strcmp(aTopic, "profile-change-net-teardown") == 0) {
   1.127 +      LOG(("Shutting down SCTP"));
   1.128 +      if (sctp_initialized) {
   1.129 +        usrsctp_finish();
   1.130 +        sctp_initialized = false;
   1.131 +      }
   1.132 +      nsCOMPtr<nsIObserverService> observerService =
   1.133 +        mozilla::services::GetObserverService();
   1.134 +      if (!observerService)
   1.135 +        return NS_ERROR_FAILURE;
   1.136 +
   1.137 +      nsresult rv = observerService->RemoveObserver(this,
   1.138 +                                                    "profile-change-net-teardown");
   1.139 +      MOZ_ASSERT(rv == NS_OK);
   1.140 +      (void) rv;
   1.141 +
   1.142 +      nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this);
   1.143 +      gDataChannelShutdown = nullptr;
   1.144 +    }
   1.145 +    return NS_OK;
   1.146 +  }
   1.147 +};
   1.148 +
   1.149 +NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
   1.150 +
   1.151 +
   1.152 +BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
   1.153 +                         uint32_t length) : mLength(length)
   1.154 +{
   1.155 +  mSpa = new sctp_sendv_spa;
   1.156 +  *mSpa = spa;
   1.157 +  char *tmp = new char[length]; // infallible malloc!
   1.158 +  memcpy(tmp, data, length);
   1.159 +  mData = tmp;
   1.160 +}
   1.161 +
   1.162 +BufferedMsg::~BufferedMsg()
   1.163 +{
   1.164 +  delete mSpa;
   1.165 +  delete mData;
   1.166 +}
   1.167 +
   1.168 +static int
   1.169 +receive_cb(struct socket* sock, union sctp_sockstore addr,
   1.170 +           void *data, size_t datalen,
   1.171 +           struct sctp_rcvinfo rcv, int flags, void *ulp_info)
   1.172 +{
   1.173 +  DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
   1.174 +  return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
   1.175 +}
   1.176 +
   1.177 +#ifdef PR_LOGGING
   1.178 +static void
   1.179 +debug_printf(const char *format, ...)
   1.180 +{
   1.181 +  va_list ap;
   1.182 +  char buffer[1024];
   1.183 +
   1.184 +  if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
   1.185 +    va_start(ap, format);
   1.186 +#ifdef _WIN32
   1.187 +    if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
   1.188 +#else
   1.189 +    if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) {
   1.190 +#endif
   1.191 +      PR_LogPrint("%s", buffer);
   1.192 +    }
   1.193 +    va_end(ap);
   1.194 +  }
   1.195 +}
   1.196 +#endif
   1.197 +
   1.198 +DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
   1.199 +   mLock("netwerk::sctp::DataChannelConnection")
   1.200 +{
   1.201 +  mState = CLOSED;
   1.202 +  mSocket = nullptr;
   1.203 +  mMasterSocket = nullptr;
   1.204 +  mListener = listener->asWeakPtr();
   1.205 +  mLocalPort = 0;
   1.206 +  mRemotePort = 0;
   1.207 +  mDeferTimeout = 10;
   1.208 +  mTimerRunning = false;
   1.209 +  LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
   1.210 +  mInternalIOThread = nullptr;
   1.211 +}
   1.212 +
   1.213 +DataChannelConnection::~DataChannelConnection()
   1.214 +{
   1.215 +  LOG(("Deleting DataChannelConnection %p", (void *) this));
   1.216 +  // This may die on the MainThread, or on the STS thread
   1.217 +  ASSERT_WEBRTC(mState == CLOSED);
   1.218 +  MOZ_ASSERT(!mMasterSocket);
   1.219 +  MOZ_ASSERT(mPending.GetSize() == 0);
   1.220 +
   1.221 +  // Already disconnected from sigslot/mTransportFlow
   1.222 +  // TransportFlows must be released from the STS thread
   1.223 +  if (!IsSTSThread()) {
   1.224 +    ASSERT_WEBRTC(NS_IsMainThread());
   1.225 +    if (mTransportFlow) {
   1.226 +      ASSERT_WEBRTC(mSTS);
   1.227 +      NS_ProxyRelease(mSTS, mTransportFlow);
   1.228 +    }
   1.229 +
   1.230 +    if (mInternalIOThread) {
   1.231 +      // Avoid spinning the event thread from here (which if we're mainthread
   1.232 +      // is in the event loop already)
   1.233 +      NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
   1.234 +                                           &nsIThread::Shutdown),
   1.235 +                              NS_DISPATCH_NORMAL);
   1.236 +    }
   1.237 +  } else {
   1.238 +    // on STS, safe to call shutdown
   1.239 +    if (mInternalIOThread) {
   1.240 +      mInternalIOThread->Shutdown();
   1.241 +    }
   1.242 +  }
   1.243 +}
   1.244 +
   1.245 +void
   1.246 +DataChannelConnection::Destroy()
   1.247 +{
   1.248 +  // Though it's probably ok to do this and close the sockets;
   1.249 +  // if we really want it to do true clean shutdowns it can
   1.250 +  // create a dependant Internal object that would remain around
   1.251 +  // until the network shut down the association or timed out.
   1.252 +  LOG(("Destroying DataChannelConnection %p", (void *) this));
   1.253 +  ASSERT_WEBRTC(NS_IsMainThread());
   1.254 +  CloseAll();
   1.255 +
   1.256 +  MutexAutoLock lock(mLock);
   1.257 +  // If we had a pending reset, we aren't waiting for it - clear the list so
   1.258 +  // we can deregister this DataChannelConnection without leaking.
   1.259 +  ClearResets();
   1.260 +
   1.261 +  MOZ_ASSERT(mSTS);
   1.262 +  ASSERT_WEBRTC(NS_IsMainThread());
   1.263 +  // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
   1.264 +  // the usrsctp_close() calls can move back here (and just proxy the
   1.265 +  // disconnect_all())
   1.266 +  RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
   1.267 +                                   &DataChannelConnection::DestroyOnSTS,
   1.268 +                                   mSocket, mMasterSocket),
   1.269 +                NS_DISPATCH_NORMAL);
   1.270 +
   1.271 +  // These will be released on STS
   1.272 +  mSocket = nullptr;
   1.273 +  mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
   1.274 +
   1.275 +  // Must do this in Destroy() since we may then delete this object
   1.276 +  if (mUsingDtls) {
   1.277 +    usrsctp_deregister_address(static_cast<void *>(this));
   1.278 +    LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
   1.279 +  }
   1.280 +
   1.281 +  // We can't get any more new callbacks from the SCTP library
   1.282 +  // All existing callbacks have refs to DataChannelConnection
   1.283 +
   1.284 +  // nsDOMDataChannel objects have refs to DataChannels that have refs to us
   1.285 +}
   1.286 +
   1.287 +void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
   1.288 +                                         struct socket *aSocket)
   1.289 +{
   1.290 +  if (aSocket && aSocket != aMasterSocket)
   1.291 +    usrsctp_close(aSocket);
   1.292 +  if (aMasterSocket)
   1.293 +    usrsctp_close(aMasterSocket);
   1.294 +
   1.295 +  disconnect_all();
   1.296 +}
   1.297 +
   1.298 +NS_IMPL_ISUPPORTS(DataChannelConnection,
   1.299 +                  nsITimerCallback)
   1.300 +
   1.301 +bool
   1.302 +DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
   1.303 +{
   1.304 +  struct sctp_initmsg initmsg;
   1.305 +  struct sctp_udpencaps encaps;
   1.306 +  struct sctp_assoc_value av;
   1.307 +  struct sctp_event event;
   1.308 +  socklen_t len;
   1.309 +
   1.310 +  uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
   1.311 +                            SCTP_PEER_ADDR_CHANGE,
   1.312 +                            SCTP_REMOTE_ERROR,
   1.313 +                            SCTP_SHUTDOWN_EVENT,
   1.314 +                            SCTP_ADAPTATION_INDICATION,
   1.315 +                            SCTP_SEND_FAILED_EVENT,
   1.316 +                            SCTP_STREAM_RESET_EVENT,
   1.317 +                            SCTP_STREAM_CHANGE_EVENT};
   1.318 +  {
   1.319 +    ASSERT_WEBRTC(NS_IsMainThread());
   1.320 +
   1.321 +    // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
   1.322 +    if (!sctp_initialized) {
   1.323 +      if (aUsingDtls) {
   1.324 +        LOG(("sctp_init(DTLS)"));
   1.325 +#ifdef MOZ_PEERCONNECTION
   1.326 +        usrsctp_init(0,
   1.327 +                     DataChannelConnection::SctpDtlsOutput,
   1.328 +#ifdef PR_LOGGING
   1.329 +                     debug_printf
   1.330 +#else
   1.331 +                     nullptr
   1.332 +#endif
   1.333 +                    );
   1.334 +#else
   1.335 +        NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
   1.336 +#endif
   1.337 +      } else {
   1.338 +        LOG(("sctp_init(%u)", aPort));
   1.339 +        usrsctp_init(aPort,
   1.340 +                     nullptr,
   1.341 +#ifdef PR_LOGGING
   1.342 +                     debug_printf
   1.343 +#else
   1.344 +                     nullptr
   1.345 +#endif
   1.346 +                    );
   1.347 +      }
   1.348 +
   1.349 +#ifdef PR_LOGGING
   1.350 +      // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs
   1.351 +      if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
   1.352 +        usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
   1.353 +      }
   1.354 +#endif
   1.355 +      usrsctp_sysctl_set_sctp_blackhole(2);
   1.356 +      // ECN is currently not supported by the Firefox code
   1.357 +      usrsctp_sysctl_set_sctp_ecn_enable(0);
   1.358 +      sctp_initialized = true;
   1.359 +
   1.360 +      gDataChannelShutdown = new DataChannelShutdown();
   1.361 +      gDataChannelShutdown->Init();
   1.362 +    }
   1.363 +  }
   1.364 +
   1.365 +  // XXX FIX! make this a global we get once
   1.366 +  // Find the STS thread
   1.367 +  nsresult rv;
   1.368 +  mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
   1.369 +  MOZ_ASSERT(NS_SUCCEEDED(rv));
   1.370 +
   1.371 +  // Open sctp with a callback
   1.372 +  if ((mMasterSocket = usrsctp_socket(
   1.373 +         aUsingDtls ? AF_CONN : AF_INET,
   1.374 +         SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
   1.375 +    return false;
   1.376 +  }
   1.377 +
   1.378 +  // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
   1.379 +  // in associations for normal IO
   1.380 +  if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
   1.381 +    LOG(("Couldn't set non_blocking on SCTP socket"));
   1.382 +    // We can't handle connect() safely if it will block, not that this will
   1.383 +    // even happen.
   1.384 +    goto error_cleanup;
   1.385 +  }
   1.386 +
   1.387 +  // Make sure when we close the socket, make sure it doesn't call us back again!
   1.388 +  // This would cause it try to use an invalid DataChannelConnection pointer
   1.389 +  struct linger l;
   1.390 +  l.l_onoff = 1;
   1.391 +  l.l_linger = 0;
   1.392 +  if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
   1.393 +                         (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
   1.394 +    LOG(("Couldn't set SO_LINGER on SCTP socket"));
   1.395 +    // unsafe to allow it to continue if this fails
   1.396 +    goto error_cleanup;
   1.397 +  }
   1.398 +
   1.399 +  // XXX Consider disabling this when we add proper SDP negotiation.
   1.400 +  // We may want to leave enabled for supporting 'cloning' of SDP offers, which
   1.401 +  // implies re-use of the same pseudo-port number, or forcing a renegotiation.
   1.402 +  {
   1.403 +    uint32_t on = 1;
   1.404 +    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
   1.405 +                           (const void *)&on, (socklen_t)sizeof(on)) < 0) {
   1.406 +      LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
   1.407 +    }
   1.408 +    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
   1.409 +                           (const void *)&on, (socklen_t)sizeof(on)) < 0) {
   1.410 +      LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
   1.411 +    }
   1.412 +  }
   1.413 +
   1.414 +  if (!aUsingDtls) {
   1.415 +    memset(&encaps, 0, sizeof(encaps));
   1.416 +    encaps.sue_address.ss_family = AF_INET;
   1.417 +    encaps.sue_port = htons(aPort);
   1.418 +    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
   1.419 +                           (const void*)&encaps,
   1.420 +                           (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
   1.421 +      LOG(("*** failed encaps errno %d", errno));
   1.422 +      goto error_cleanup;
   1.423 +    }
   1.424 +    LOG(("SCTP encapsulation local port %d", aPort));
   1.425 +  }
   1.426 +
   1.427 +  av.assoc_id = SCTP_ALL_ASSOC;
   1.428 +  av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
   1.429 +  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
   1.430 +                         (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
   1.431 +    LOG(("*** failed enable stream reset errno %d", errno));
   1.432 +    goto error_cleanup;
   1.433 +  }
   1.434 +
   1.435 +  /* Enable the events of interest. */
   1.436 +  memset(&event, 0, sizeof(event));
   1.437 +  event.se_assoc_id = SCTP_ALL_ASSOC;
   1.438 +  event.se_on = 1;
   1.439 +  for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) {
   1.440 +    event.se_type = event_types[i];
   1.441 +    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
   1.442 +      LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
   1.443 +      goto error_cleanup;
   1.444 +    }
   1.445 +  }
   1.446 +
   1.447 +  // Update number of streams
   1.448 +  mStreams.AppendElements(aNumStreams);
   1.449 +  for (uint32_t i = 0; i < aNumStreams; ++i) {
   1.450 +    mStreams[i] = nullptr;
   1.451 +  }
   1.452 +  memset(&initmsg, 0, sizeof(initmsg));
   1.453 +  len = sizeof(initmsg);
   1.454 +  if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
   1.455 +    LOG(("*** failed getsockopt SCTP_INITMSG"));
   1.456 +    goto error_cleanup;
   1.457 +  }
   1.458 +  LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
   1.459 +       initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
   1.460 +  initmsg.sinit_num_ostreams  = aNumStreams;
   1.461 +  initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
   1.462 +  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
   1.463 +                         (socklen_t)sizeof(initmsg)) < 0) {
   1.464 +    LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
   1.465 +    goto error_cleanup;
   1.466 +  }
   1.467 +
   1.468 +  mSocket = nullptr;
   1.469 +  if (aUsingDtls) {
   1.470 +    mUsingDtls = true;
   1.471 +    usrsctp_register_address(static_cast<void *>(this));
   1.472 +    LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
   1.473 +  } else {
   1.474 +    mUsingDtls = false;
   1.475 +  }
   1.476 +  return true;
   1.477 +
   1.478 +error_cleanup:
   1.479 +  usrsctp_close(mMasterSocket);
   1.480 +  mMasterSocket = nullptr;
   1.481 +  mUsingDtls = false;
   1.482 +  return false;
   1.483 +}
   1.484 +
   1.485 +void
   1.486 +DataChannelConnection::StartDefer()
   1.487 +{
   1.488 +  nsresult rv;
   1.489 +  if (!NS_IsMainThread()) {
   1.490 +    NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   1.491 +                            DataChannelOnMessageAvailable::START_DEFER,
   1.492 +                            this, (DataChannel *) nullptr));
   1.493 +    return;
   1.494 +  }
   1.495 +
   1.496 +  ASSERT_WEBRTC(NS_IsMainThread());
   1.497 +  if (!mDeferredTimer) {
   1.498 +    mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
   1.499 +    MOZ_ASSERT(mDeferredTimer);
   1.500 +  }
   1.501 +
   1.502 +  if (!mTimerRunning) {
   1.503 +    rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
   1.504 +                                          nsITimer::TYPE_ONE_SHOT);
   1.505 +    NS_ENSURE_TRUE_VOID(rv == NS_OK);
   1.506 +
   1.507 +    mTimerRunning = true;
   1.508 +  }
   1.509 +}
   1.510 +
   1.511 +// nsITimerCallback
   1.512 +
   1.513 +NS_IMETHODIMP
   1.514 +DataChannelConnection::Notify(nsITimer *timer)
   1.515 +{
   1.516 +  ASSERT_WEBRTC(NS_IsMainThread());
   1.517 +  LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout));
   1.518 +
   1.519 +  if (timer == mDeferredTimer) {
   1.520 +    if (SendDeferredMessages()) {
   1.521 +      // Still blocked
   1.522 +      // we don't need a lock, since this must be main thread...
   1.523 +      nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
   1.524 +                                                     nsITimer::TYPE_ONE_SHOT);
   1.525 +      if (NS_FAILED(rv)) {
   1.526 +        LOG(("%s: cannot initialize open timer", __FUNCTION__));
   1.527 +        // XXX and do....?
   1.528 +        return rv;
   1.529 +      }
   1.530 +      mTimerRunning = true;
   1.531 +    } else {
   1.532 +      LOG(("Turned off deferred send timer"));
   1.533 +      mTimerRunning = false;
   1.534 +    }
   1.535 +  }
   1.536 +  return NS_OK;
   1.537 +}
   1.538 +
   1.539 +#ifdef MOZ_PEERCONNECTION
   1.540 +void
   1.541 +DataChannelConnection::SetEvenOdd()
   1.542 +{
   1.543 +  ASSERT_WEBRTC(IsSTSThread());
   1.544 +
   1.545 +  TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
   1.546 +      mTransportFlow->GetLayer(TransportLayerDtls::ID()));
   1.547 +  MOZ_ASSERT(dtls);  // DTLS is mandatory
   1.548 +  mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
   1.549 +}
   1.550 +
   1.551 +bool
   1.552 +DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
   1.553 +{
   1.554 +  LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
   1.555 +
   1.556 +  NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
   1.557 +  NS_ENSURE_TRUE(aFlow, false);
   1.558 +
   1.559 +  mTransportFlow = aFlow;
   1.560 +  mLocalPort = localport;
   1.561 +  mRemotePort = remoteport;
   1.562 +  mState = CONNECTING;
   1.563 +
   1.564 +  RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
   1.565 +                                   &DataChannelConnection::SetSignals),
   1.566 +                NS_DISPATCH_NORMAL);
   1.567 +  return true;
   1.568 +}
   1.569 +
   1.570 +void
   1.571 +DataChannelConnection::SetSignals()
   1.572 +{
   1.573 +  ASSERT_WEBRTC(IsSTSThread());
   1.574 +  ASSERT_WEBRTC(mTransportFlow);
   1.575 +  LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
   1.576 +  mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
   1.577 +  // SignalStateChange() doesn't call you with the initial state
   1.578 +  mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
   1.579 +  CompleteConnect(mTransportFlow, mTransportFlow->state());
   1.580 +}
   1.581 +
   1.582 +void
   1.583 +DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
   1.584 +{
   1.585 +  LOG(("Data transport state: %d", state));
   1.586 +  MutexAutoLock lock(mLock);
   1.587 +  ASSERT_WEBRTC(IsSTSThread());
   1.588 +  // We should abort connection on TS_ERROR.
   1.589 +  // Note however that the association will also fail (perhaps with a delay) and
   1.590 +  // notify us in that way
   1.591 +  if (state != TransportLayer::TS_OPEN || !mMasterSocket)
   1.592 +    return;
   1.593 +
   1.594 +  struct sockaddr_conn addr;
   1.595 +  memset(&addr, 0, sizeof(addr));
   1.596 +  addr.sconn_family = AF_CONN;
   1.597 +#if defined(__Userspace_os_Darwin)
   1.598 +  addr.sconn_len = sizeof(addr);
   1.599 +#endif
   1.600 +  addr.sconn_port = htons(mLocalPort);
   1.601 +  addr.sconn_addr = static_cast<void *>(this);
   1.602 +
   1.603 +  LOG(("Calling usrsctp_bind"));
   1.604 +  int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
   1.605 +                       sizeof(addr));
   1.606 +  if (r < 0) {
   1.607 +    LOG(("usrsctp_bind failed: %d", r));
   1.608 +  } else {
   1.609 +    // This is the remote addr
   1.610 +    addr.sconn_port = htons(mRemotePort);
   1.611 +    LOG(("Calling usrsctp_connect"));
   1.612 +    r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
   1.613 +                        sizeof(addr));
   1.614 +    if (r < 0) {
   1.615 +      if (errno == EINPROGRESS) {
   1.616 +        // non-blocking
   1.617 +        return;
   1.618 +      } else {
   1.619 +        LOG(("usrsctp_connect failed: %d", errno));
   1.620 +        mState = CLOSED;
   1.621 +      }
   1.622 +    } else {
   1.623 +      // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
   1.624 +      // This also avoids issues with calling TransportFlow stuff on Mainthread
   1.625 +      return;
   1.626 +    }
   1.627 +  }
   1.628 +  // Note: currently this doesn't actually notify the application
   1.629 +  NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   1.630 +                            DataChannelOnMessageAvailable::ON_CONNECTION,
   1.631 +                            this, false));
   1.632 +  return;
   1.633 +}
   1.634 +
   1.635 +// Process any pending Opens
   1.636 +void
   1.637 +DataChannelConnection::ProcessQueuedOpens()
   1.638 +{
   1.639 +  // The nsDeque holds channels with an AddRef applied.  Another reference
   1.640 +  // (may) be held by the DOMDataChannel, unless it's been GC'd.  No other
   1.641 +  // references should exist.
   1.642 +
   1.643 +  // Can't copy nsDeque's.  Move into temp array since any that fail will
   1.644 +  // go back to mPending
   1.645 +  nsDeque temp;
   1.646 +  DataChannel *temp_channel; // really already_AddRefed<>
   1.647 +  while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
   1.648 +    temp.Push(static_cast<void *>(temp_channel));
   1.649 +  }
   1.650 +
   1.651 +  nsRefPtr<DataChannel> channel;
   1.652 +  // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
   1.653 +  while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
   1.654 +    if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
   1.655 +      LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
   1.656 +      channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
   1.657 +      // OpenFinish returns a reference itself, so we need to take it can Release it
   1.658 +      channel = OpenFinish(channel.forget()); // may reset the flag and re-push
   1.659 +    } else {
   1.660 +      NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
   1.661 +    }
   1.662 +  }
   1.663 +
   1.664 +}
   1.665 +void
   1.666 +DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
   1.667 +                                     const unsigned char *data, size_t len)
   1.668 +{
   1.669 +#ifdef PR_LOGGING
   1.670 +  if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
   1.671 +    char *buf;
   1.672 +
   1.673 +    if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
   1.674 +      PR_LogPrint("%s", buf);
   1.675 +      usrsctp_freedumpbuffer(buf);
   1.676 +    }
   1.677 +  }
   1.678 +#endif
   1.679 +  // Pass the data to SCTP
   1.680 +  usrsctp_conninput(static_cast<void *>(this), data, len, 0);
   1.681 +}
   1.682 +
   1.683 +int
   1.684 +DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release)
   1.685 +{
   1.686 +  //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
   1.687 +  int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
   1.688 +  if (release)
   1.689 +    delete data;
   1.690 +  return res;
   1.691 +}
   1.692 +
   1.693 +/* static */
   1.694 +int
   1.695 +DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
   1.696 +                                      uint8_t tos, uint8_t set_df)
   1.697 +{
   1.698 +  DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
   1.699 +  int res;
   1.700 +
   1.701 +#ifdef PR_LOGGING
   1.702 +  if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
   1.703 +    char *buf;
   1.704 +
   1.705 +    if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
   1.706 +      PR_LogPrint("%s", buf);
   1.707 +      usrsctp_freedumpbuffer(buf);
   1.708 +    }
   1.709 +  }
   1.710 +#endif
   1.711 +  // We're async proxying even if on the STSThread because this is called
   1.712 +  // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
   1.713 +  // SCTP has an option for Apple, on IP connections only, to release at least
   1.714 +  // one of the locks before calling a packet output routine; with changes to
   1.715 +  // the underlying SCTP stack this might remove the need to use an async proxy.
   1.716 +  if (0 /*peer->IsSTSThread()*/) {
   1.717 +    res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
   1.718 +  } else {
   1.719 +    unsigned char *data = new unsigned char[length];
   1.720 +    memcpy(data, buffer, length);
   1.721 +    res = -1;
   1.722 +    // XXX It might be worthwhile to add an assertion against the thread
   1.723 +    // somehow getting into the DataChannel/SCTP code again, as
   1.724 +    // DISPATCH_SYNC is not fully blocking.  This may be tricky, as it
   1.725 +    // needs to be a per-thread check, not a global.
   1.726 +    peer->mSTS->Dispatch(WrapRunnable(
   1.727 +                           nsRefPtr<DataChannelConnection>(peer),
   1.728 +                           &DataChannelConnection::SendPacket, data, length, true),
   1.729 +                         NS_DISPATCH_NORMAL);
   1.730 +    res = 0; // cheat!  Packets can always be dropped later anyways
   1.731 +  }
   1.732 +  return res;
   1.733 +}
   1.734 +#endif
   1.735 +
   1.736 +#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
   1.737 +// listen for incoming associations
   1.738 +// Blocks! - Don't call this from main thread!
   1.739 +
   1.740 +#error This code will not work as-is since SetEvenOdd() runs on Mainthread
   1.741 +
   1.742 +bool
   1.743 +DataChannelConnection::Listen(unsigned short port)
   1.744 +{
   1.745 +  struct sockaddr_in addr;
   1.746 +  socklen_t addr_len;
   1.747 +
   1.748 +  NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
   1.749 +
   1.750 +  /* Acting as the 'server' */
   1.751 +  memset((void *)&addr, 0, sizeof(addr));
   1.752 +#ifdef HAVE_SIN_LEN
   1.753 +  addr.sin_len = sizeof(struct sockaddr_in);
   1.754 +#endif
   1.755 +  addr.sin_family = AF_INET;
   1.756 +  addr.sin_port = htons(port);
   1.757 +  addr.sin_addr.s_addr = htonl(INADDR_ANY);
   1.758 +  LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
   1.759 +  mState = CONNECTING;
   1.760 +  if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
   1.761 +    LOG(("***Failed userspace_bind"));
   1.762 +    return false;
   1.763 +  }
   1.764 +  if (usrsctp_listen(mMasterSocket, 1) < 0) {
   1.765 +    LOG(("***Failed userspace_listen"));
   1.766 +    return false;
   1.767 +  }
   1.768 +
   1.769 +  LOG(("Accepting connection"));
   1.770 +  addr_len = 0;
   1.771 +  if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
   1.772 +    LOG(("***Failed accept"));
   1.773 +    return false;
   1.774 +  }
   1.775 +  mState = OPEN;
   1.776 +
   1.777 +  struct linger l;
   1.778 +  l.l_onoff = 1;
   1.779 +  l.l_linger = 0;
   1.780 +  if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
   1.781 +                         (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
   1.782 +    LOG(("Couldn't set SO_LINGER on SCTP socket"));
   1.783 +  }
   1.784 +
   1.785 +  SetEvenOdd();
   1.786 +
   1.787 +  // Notify Connection open
   1.788 +  // XXX We need to make sure connection sticks around until the message is delivered
   1.789 +  LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
   1.790 +  NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   1.791 +                            DataChannelOnMessageAvailable::ON_CONNECTION,
   1.792 +                            this, (DataChannel *) nullptr));
   1.793 +  return true;
   1.794 +}
   1.795 +
   1.796 +// Blocks! - Don't call this from main thread!
   1.797 +bool
   1.798 +DataChannelConnection::Connect(const char *addr, unsigned short port)
   1.799 +{
   1.800 +  struct sockaddr_in addr4;
   1.801 +  struct sockaddr_in6 addr6;
   1.802 +
   1.803 +  NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
   1.804 +
   1.805 +  /* Acting as the connector */
   1.806 +  LOG(("Connecting to %s, port %u", addr, port));
   1.807 +  memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
   1.808 +  memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
   1.809 +#ifdef HAVE_SIN_LEN
   1.810 +  addr4.sin_len = sizeof(struct sockaddr_in);
   1.811 +#endif
   1.812 +#ifdef HAVE_SIN6_LEN
   1.813 +  addr6.sin6_len = sizeof(struct sockaddr_in6);
   1.814 +#endif
   1.815 +  addr4.sin_family = AF_INET;
   1.816 +  addr6.sin6_family = AF_INET6;
   1.817 +  addr4.sin_port = htons(port);
   1.818 +  addr6.sin6_port = htons(port);
   1.819 +  mState = CONNECTING;
   1.820 +
   1.821 +#if !defined(__Userspace_os_Windows)
   1.822 +  if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
   1.823 +    if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
   1.824 +      LOG(("*** Failed userspace_connect"));
   1.825 +      return false;
   1.826 +    }
   1.827 +  } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
   1.828 +    if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
   1.829 +      LOG(("*** Failed userspace_connect"));
   1.830 +      return false;
   1.831 +    }
   1.832 +  } else {
   1.833 +    LOG(("*** Illegal destination address."));
   1.834 +  }
   1.835 +#else
   1.836 +  {
   1.837 +    struct sockaddr_storage ss;
   1.838 +    int sslen = sizeof(ss);
   1.839 +
   1.840 +    if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
   1.841 +      addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
   1.842 +      if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
   1.843 +        LOG(("*** Failed userspace_connect"));
   1.844 +        return false;
   1.845 +      }
   1.846 +    } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
   1.847 +      addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
   1.848 +      if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
   1.849 +        LOG(("*** Failed userspace_connect"));
   1.850 +        return false;
   1.851 +      }
   1.852 +    } else {
   1.853 +      LOG(("*** Illegal destination address."));
   1.854 +    }
   1.855 +  }
   1.856 +#endif
   1.857 +
   1.858 +  mSocket = mMasterSocket;
   1.859 +
   1.860 +  LOG(("connect() succeeded!  Entering connected mode"));
   1.861 +  mState = OPEN;
   1.862 +
   1.863 +  SetEvenOdd();
   1.864 +
   1.865 +  // Notify Connection open
   1.866 +  // XXX We need to make sure connection sticks around until the message is delivered
   1.867 +  LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
   1.868 +  NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
   1.869 +                            DataChannelOnMessageAvailable::ON_CONNECTION,
   1.870 +                            this, (DataChannel *) nullptr));
   1.871 +  return true;
   1.872 +}
   1.873 +#endif
   1.874 +
   1.875 +DataChannel *
   1.876 +DataChannelConnection::FindChannelByStream(uint16_t stream)
   1.877 +{
   1.878 +  return mStreams.SafeElementAt(stream);
   1.879 +}
   1.880 +
   1.881 +uint16_t
   1.882 +DataChannelConnection::FindFreeStream()
   1.883 +{
   1.884 +  uint32_t i, j, limit;
   1.885 +
   1.886 +  limit = mStreams.Length();
   1.887 +  if (limit > MAX_NUM_STREAMS)
   1.888 +    limit = MAX_NUM_STREAMS;
   1.889 +
   1.890 +  for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
   1.891 +    if (!mStreams[i]) {
   1.892 +      // Verify it's not still in the process of closing
   1.893 +      for (j = 0; j < mStreamsResetting.Length(); ++j) {
   1.894 +        if (mStreamsResetting[j] == i) {
   1.895 +          break;
   1.896 +        }
   1.897 +      }
   1.898 +      if (j == mStreamsResetting.Length())
   1.899 +        break;
   1.900 +    }
   1.901 +  }
   1.902 +  if (i >= limit) {
   1.903 +    return INVALID_STREAM;
   1.904 +  }
   1.905 +  return i;
   1.906 +}
   1.907 +
   1.908 +bool
   1.909 +DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
   1.910 +{
   1.911 +  struct sctp_status status;
   1.912 +  struct sctp_add_streams sas;
   1.913 +  uint32_t outStreamsNeeded;
   1.914 +  socklen_t len;
   1.915 +
   1.916 +  if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
   1.917 +    aNeeded = MAX_NUM_STREAMS - mStreams.Length();
   1.918 +  }
   1.919 +  if (aNeeded <= 0) {
   1.920 +    return false;
   1.921 +  }
   1.922 +
   1.923 +  len = (socklen_t)sizeof(struct sctp_status);
   1.924 +  if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
   1.925 +    LOG(("***failed: getsockopt SCTP_STATUS"));
   1.926 +    return false;
   1.927 +  }
   1.928 +  outStreamsNeeded = aNeeded; // number to add
   1.929 +
   1.930 +  // Note: if multiple channel opens happen when we don't have enough space,
   1.931 +  // we'll call RequestMoreStreams() multiple times
   1.932 +  memset(&sas, 0, sizeof(sas));
   1.933 +  sas.sas_instrms = 0;
   1.934 +  sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
   1.935 +  // Doesn't block, we get an event when it succeeds or fails
   1.936 +  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
   1.937 +                         (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
   1.938 +    if (errno == EALREADY) {
   1.939 +      LOG(("Already have %u output streams", outStreamsNeeded));
   1.940 +      return true;
   1.941 +    }
   1.942 +
   1.943 +    LOG(("***failed: setsockopt ADD errno=%d", errno));
   1.944 +    return false;
   1.945 +  }
   1.946 +  LOG(("Requested %u more streams", outStreamsNeeded));
   1.947 +  // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
   1.948 +  // values are larger than mStreams.Length()
   1.949 +  return true;
   1.950 +}
   1.951 +
   1.952 +int32_t
   1.953 +DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
   1.954 +{
   1.955 +  struct sctp_sndinfo sndinfo;
   1.956 +
   1.957 +  // Note: Main-thread IO, but doesn't block
   1.958 +  memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
   1.959 +  sndinfo.snd_sid = stream;
   1.960 +  sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
   1.961 +  if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
   1.962 +                    &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
   1.963 +                    SCTP_SENDV_SNDINFO, 0) < 0) {
   1.964 +    //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
   1.965 +    return (0);
   1.966 +  }
   1.967 +  return (1);
   1.968 +}
   1.969 +
   1.970 +int32_t
   1.971 +DataChannelConnection::SendOpenAckMessage(uint16_t stream)
   1.972 +{
   1.973 +  struct rtcweb_datachannel_ack ack;
   1.974 +
   1.975 +  memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
   1.976 +  ack.msg_type = DATA_CHANNEL_ACK;
   1.977 +
   1.978 +  return SendControlMessage(&ack, sizeof(ack), stream);
   1.979 +}
   1.980 +
   1.981 +int32_t
   1.982 +DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
   1.983 +                                              const nsACString& protocol,
   1.984 +                                              uint16_t stream, bool unordered,
   1.985 +                                              uint16_t prPolicy, uint32_t prValue)
   1.986 +{
   1.987 +  int label_len = label.Length(); // not including nul
   1.988 +  int proto_len = protocol.Length(); // not including nul
   1.989 +  struct rtcweb_datachannel_open_request *req =
   1.990 +    (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len);
   1.991 +   // careful - request includes 1 char label
   1.992 +
   1.993 +  memset(req, 0, sizeof(struct rtcweb_datachannel_open_request));
   1.994 +  req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
   1.995 +  switch (prPolicy) {
   1.996 +  case SCTP_PR_SCTP_NONE:
   1.997 +    req->channel_type = DATA_CHANNEL_RELIABLE;
   1.998 +    break;
   1.999 +  case SCTP_PR_SCTP_TTL:
  1.1000 +    req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
  1.1001 +    break;
  1.1002 +  case SCTP_PR_SCTP_RTX:
  1.1003 +    req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
  1.1004 +    break;
  1.1005 +  default:
  1.1006 +    // FIX! need to set errno!  Or make all these SendXxxx() funcs return 0 or errno!
  1.1007 +    moz_free(req);
  1.1008 +    return (0);
  1.1009 +  }
  1.1010 +  if (unordered) {
  1.1011 +    // Per the current types, all differ by 0x80 between ordered and unordered
  1.1012 +    req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
  1.1013 +  }
  1.1014 +
  1.1015 +  req->reliability_param = htonl(prValue);
  1.1016 +  req->priority = htons(0); /* XXX: add support */
  1.1017 +  req->label_length = htons(label_len);
  1.1018 +  req->protocol_length = htons(proto_len);
  1.1019 +  memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
  1.1020 +  memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
  1.1021 +
  1.1022 +  // sizeof(*req) already includes +1 byte for label, need nul for both strings
  1.1023 +  int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream);
  1.1024 +
  1.1025 +  moz_free(req);
  1.1026 +  return result;
  1.1027 +}
  1.1028 +
  1.1029 +// XXX This should use a separate thread (outbound queue) which should
  1.1030 +// select() to know when to *try* to send data to the socket again.
  1.1031 +// Alternatively, it can use a timeout, but that's guaranteed to be wrong
  1.1032 +// (just not sure in what direction).  We could re-implement NSPR's
  1.1033 +// PR_POLL_WRITE/etc handling... with a lot of work.
  1.1034 +
  1.1035 +// Better yet, use the SCTP stack's notifications on buffer state to avoid
  1.1036 +// filling the SCTP's buffers.
  1.1037 +
  1.1038 +// returns if we're still blocked or not
  1.1039 +bool
  1.1040 +DataChannelConnection::SendDeferredMessages()
  1.1041 +{
  1.1042 +  uint32_t i;
  1.1043 +  nsRefPtr<DataChannel> channel; // we may null out the refs to this
  1.1044 +  bool still_blocked = false;
  1.1045 +  bool sent = false;
  1.1046 +
  1.1047 +  // This may block while something is modifying channels, but should not block for IO
  1.1048 +  MutexAutoLock lock(mLock);
  1.1049 +
  1.1050 +  // XXX For total fairness, on a still_blocked we'd start next time at the
  1.1051 +  // same index.  Sorry, not going to bother for now.
  1.1052 +  for (i = 0; i < mStreams.Length(); ++i) {
  1.1053 +    channel = mStreams[i];
  1.1054 +    if (!channel)
  1.1055 +      continue;
  1.1056 +
  1.1057 +    // Only one of these should be set....
  1.1058 +    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
  1.1059 +      if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
  1.1060 +                                 channel->mStream,
  1.1061 +                                 channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
  1.1062 +                                 channel->mPrPolicy, channel->mPrValue)) {
  1.1063 +        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
  1.1064 +
  1.1065 +        channel->mState = OPEN;
  1.1066 +        channel->mReady = true;
  1.1067 +        LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1.1068 +        NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1069 +                                  DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
  1.1070 +                                  channel));
  1.1071 +        sent = true;
  1.1072 +      } else {
  1.1073 +        if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1.1074 +          still_blocked = true;
  1.1075 +        } else {
  1.1076 +          // Close the channel, inform the user
  1.1077 +          mStreams[channel->mStream] = nullptr;
  1.1078 +          channel->mState = CLOSED;
  1.1079 +          // Don't need to reset; we didn't open it
  1.1080 +          NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1081 +                                    DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1.1082 +                                    channel));
  1.1083 +        }
  1.1084 +      }
  1.1085 +    }
  1.1086 +    if (still_blocked)
  1.1087 +      break;
  1.1088 +
  1.1089 +    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
  1.1090 +      if (SendOpenAckMessage(channel->mStream)) {
  1.1091 +        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
  1.1092 +        sent = true;
  1.1093 +      } else {
  1.1094 +        if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1.1095 +          still_blocked = true;
  1.1096 +        } else {
  1.1097 +          // Close the channel, inform the user
  1.1098 +          CloseInt(channel);
  1.1099 +          // XXX send error via DataChannelOnMessageAvailable (bug 843625)
  1.1100 +        }
  1.1101 +      }
  1.1102 +    }
  1.1103 +    if (still_blocked)
  1.1104 +      break;
  1.1105 +
  1.1106 +    if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
  1.1107 +      bool failed_send = false;
  1.1108 +      int32_t result;
  1.1109 +
  1.1110 +      if (channel->mState == CLOSED || channel->mState == CLOSING) {
  1.1111 +        channel->mBufferedData.Clear();
  1.1112 +      }
  1.1113 +      while (!channel->mBufferedData.IsEmpty() &&
  1.1114 +             !failed_send) {
  1.1115 +        struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
  1.1116 +        const char *data           = channel->mBufferedData[0]->mData;
  1.1117 +        uint32_t len               = channel->mBufferedData[0]->mLength;
  1.1118 +
  1.1119 +        // SCTP will return EMSGSIZE if the message is bigger than the buffer
  1.1120 +        // size (or EAGAIN if there isn't space)
  1.1121 +        if ((result = usrsctp_sendv(mSocket, data, len,
  1.1122 +                                    nullptr, 0,
  1.1123 +                                    (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
  1.1124 +                                    SCTP_SENDV_SPA,
  1.1125 +                                    0) < 0)) {
  1.1126 +          if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1.1127 +            // leave queued for resend
  1.1128 +            failed_send = true;
  1.1129 +            LOG(("queue full again when resending %d bytes (%d)", len, result));
  1.1130 +          } else {
  1.1131 +            LOG(("error %d re-sending string", errno));
  1.1132 +            failed_send = true;
  1.1133 +          }
  1.1134 +        } else {
  1.1135 +          LOG(("Resent buffer of %d bytes (%d)", len, result));
  1.1136 +          sent = true;
  1.1137 +          channel->mBufferedData.RemoveElementAt(0);
  1.1138 +        }
  1.1139 +      }
  1.1140 +      if (channel->mBufferedData.IsEmpty())
  1.1141 +        channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
  1.1142 +      else
  1.1143 +        still_blocked = true;
  1.1144 +    }
  1.1145 +    if (still_blocked)
  1.1146 +      break;
  1.1147 +  }
  1.1148 +
  1.1149 +  if (!still_blocked) {
  1.1150 +    // mDeferTimeout becomes an estimate of how long we need to wait next time we block
  1.1151 +    return false;
  1.1152 +  }
  1.1153 +  // adjust time?  More time for next wait if we didn't send anything, less if did
  1.1154 +  // Pretty crude, but better than nothing; just to keep CPU use down
  1.1155 +  if (!sent && mDeferTimeout < 50)
  1.1156 +    mDeferTimeout++;
  1.1157 +  else if (sent && mDeferTimeout > 10)
  1.1158 +    mDeferTimeout--;
  1.1159 +
  1.1160 +  return true;
  1.1161 +}
  1.1162 +
  1.1163 +void
  1.1164 +DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
  1.1165 +                                                size_t length,
  1.1166 +                                                uint16_t stream)
  1.1167 +{
  1.1168 +  nsRefPtr<DataChannel> channel;
  1.1169 +  uint32_t prValue;
  1.1170 +  uint16_t prPolicy;
  1.1171 +  uint32_t flags;
  1.1172 +
  1.1173 +  mLock.AssertCurrentThreadOwns();
  1.1174 +
  1.1175 +  if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
  1.1176 +    LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
  1.1177 +         (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
  1.1178 +    if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
  1.1179 +      return;
  1.1180 +  }
  1.1181 +
  1.1182 +  LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
  1.1183 +
  1.1184 +  switch (req->channel_type) {
  1.1185 +    case DATA_CHANNEL_RELIABLE:
  1.1186 +    case DATA_CHANNEL_RELIABLE_UNORDERED:
  1.1187 +      prPolicy = SCTP_PR_SCTP_NONE;
  1.1188 +      break;
  1.1189 +    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
  1.1190 +    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
  1.1191 +      prPolicy = SCTP_PR_SCTP_RTX;
  1.1192 +      break;
  1.1193 +    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
  1.1194 +    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
  1.1195 +      prPolicy = SCTP_PR_SCTP_TTL;
  1.1196 +      break;
  1.1197 +    default:
  1.1198 +      LOG(("Unknown channel type", req->channel_type));
  1.1199 +      /* XXX error handling */
  1.1200 +      return;
  1.1201 +  }
  1.1202 +  prValue = ntohl(req->reliability_param);
  1.1203 +  flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
  1.1204 +
  1.1205 +  if ((channel = FindChannelByStream(stream))) {
  1.1206 +    if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
  1.1207 +      LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
  1.1208 +           stream, channel->mState));
  1.1209 +     /* XXX: some error handling */
  1.1210 +    } else {
  1.1211 +      LOG(("Open for externally negotiated channel %u", stream));
  1.1212 +      // XXX should also check protocol, maybe label
  1.1213 +      if (prPolicy != channel->mPrPolicy ||
  1.1214 +          prValue != channel->mPrValue ||
  1.1215 +          flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
  1.1216 +      {
  1.1217 +        LOG(("WARNING: external negotiation mismatch with OpenRequest:"
  1.1218 +             "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
  1.1219 +             stream, prPolicy, channel->mPrPolicy,
  1.1220 +             prValue, channel->mPrValue, flags, channel->mFlags));
  1.1221 +      }
  1.1222 +    }
  1.1223 +    return;
  1.1224 +  }
  1.1225 +  if (stream >= mStreams.Length()) {
  1.1226 +    LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
  1.1227 +    return;
  1.1228 +  }
  1.1229 +
  1.1230 +  nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
  1.1231 +  nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
  1.1232 +                                           ntohs(req->protocol_length)));
  1.1233 +
  1.1234 +  channel = new DataChannel(this,
  1.1235 +                            stream,
  1.1236 +                            DataChannel::CONNECTING,
  1.1237 +                            label,
  1.1238 +                            protocol,
  1.1239 +                            prPolicy, prValue,
  1.1240 +                            flags,
  1.1241 +                            nullptr, nullptr);
  1.1242 +  mStreams[stream] = channel;
  1.1243 +
  1.1244 +  channel->mState = DataChannel::WAITING_TO_OPEN;
  1.1245 +
  1.1246 +  LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
  1.1247 +       channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
  1.1248 +  NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1249 +                            DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
  1.1250 +                            this, channel));
  1.1251 +
  1.1252 +  LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1.1253 +
  1.1254 +  if (!SendOpenAckMessage(stream)) {
  1.1255 +    // XXX Only on EAGAIN!?  And if not, then close the channel??
  1.1256 +    channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
  1.1257 +    StartDefer();
  1.1258 +  }
  1.1259 +
  1.1260 +  // Now process any queued data messages for the channel (which will
  1.1261 +  // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
  1.1262 +  // more that come in before that happens)
  1.1263 +  DeliverQueuedData(stream);
  1.1264 +}
  1.1265 +
  1.1266 +// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
  1.1267 +// That would make this code moot.  Keep it for now for backwards compatibility.
  1.1268 +void
  1.1269 +DataChannelConnection::DeliverQueuedData(uint16_t stream)
  1.1270 +{
  1.1271 +  mLock.AssertCurrentThreadOwns();
  1.1272 +
  1.1273 +  uint32_t i = 0;
  1.1274 +  while (i < mQueuedData.Length()) {
  1.1275 +    // Careful! we may modify the array length from within the loop!
  1.1276 +    if (mQueuedData[i]->mStream == stream) {
  1.1277 +      LOG(("Delivering queued data for stream %u, length %u",
  1.1278 +           stream, mQueuedData[i]->mLength));
  1.1279 +      // Deliver the queued data
  1.1280 +      HandleDataMessage(mQueuedData[i]->mPpid,
  1.1281 +                        mQueuedData[i]->mData, mQueuedData[i]->mLength,
  1.1282 +                        mQueuedData[i]->mStream);
  1.1283 +      mQueuedData.RemoveElementAt(i);
  1.1284 +      continue; // don't bump index since we removed the element
  1.1285 +    }
  1.1286 +    i++;
  1.1287 +  }
  1.1288 +}
  1.1289 +
  1.1290 +void
  1.1291 +DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
  1.1292 +                                            size_t length, uint16_t stream)
  1.1293 +{
  1.1294 +  DataChannel *channel;
  1.1295 +
  1.1296 +  mLock.AssertCurrentThreadOwns();
  1.1297 +
  1.1298 +  channel = FindChannelByStream(stream);
  1.1299 +  NS_ENSURE_TRUE_VOID(channel);
  1.1300 +
  1.1301 +  LOG(("OpenAck received for stream %u, waiting=%d", stream,
  1.1302 +       (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
  1.1303 +
  1.1304 +  channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
  1.1305 +}
  1.1306 +
  1.1307 +void
  1.1308 +DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
  1.1309 +{
  1.1310 +  /* XXX: Send an error message? */
  1.1311 +  LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
  1.1312 +  // XXX Log to JS error console if possible
  1.1313 +}
  1.1314 +
  1.1315 +void
  1.1316 +DataChannelConnection::HandleDataMessage(uint32_t ppid,
  1.1317 +                                         const void *data, size_t length,
  1.1318 +                                         uint16_t stream)
  1.1319 +{
  1.1320 +  DataChannel *channel;
  1.1321 +  const char *buffer = (const char *) data;
  1.1322 +
  1.1323 +  mLock.AssertCurrentThreadOwns();
  1.1324 +
  1.1325 +  channel = FindChannelByStream(stream);
  1.1326 +
  1.1327 +  // XXX A closed channel may trip this... check
  1.1328 +  // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
  1.1329 +  // That would make this code moot.  Keep it for now for backwards compatibility.
  1.1330 +  if (!channel) {
  1.1331 +    // In the updated 0-RTT open case, the sender can send data immediately
  1.1332 +    // after Open, and doesn't set the in-order bit (since we don't have a
  1.1333 +    // response or ack).  Also, with external negotiation, data can come in
  1.1334 +    // before we're told about the external negotiation.  We need to buffer
  1.1335 +    // data until either a) Open comes in, if the ordering get messed up,
  1.1336 +    // or b) the app tells us this channel was externally negotiated.  When
  1.1337 +    // these occur, we deliver the data.
  1.1338 +
  1.1339 +    // Since this is rare and non-performance, keep a single list of queued
  1.1340 +    // data messages to deliver once the channel opens.
  1.1341 +    LOG(("Queuing data for stream %u, length %u", stream, length));
  1.1342 +    // Copies data
  1.1343 +    mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
  1.1344 +    return;
  1.1345 +  }
  1.1346 +
  1.1347 +  // XXX should this be a simple if, no warnings/debugbreaks?
  1.1348 +  NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
  1.1349 +
  1.1350 +  {
  1.1351 +    nsAutoCString recvData(buffer, length); // copies (<64) or allocates
  1.1352 +    bool is_binary = true;
  1.1353 +
  1.1354 +    if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
  1.1355 +        ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
  1.1356 +      is_binary = false;
  1.1357 +    }
  1.1358 +    if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
  1.1359 +      NS_WARNING("DataChannel message aborted by fragment type change!");
  1.1360 +      channel->mRecvBuffer.Truncate(0);
  1.1361 +    }
  1.1362 +    channel->mIsRecvBinary = is_binary;
  1.1363 +
  1.1364 +    switch (ppid) {
  1.1365 +      case DATA_CHANNEL_PPID_DOMSTRING:
  1.1366 +      case DATA_CHANNEL_PPID_BINARY:
  1.1367 +        channel->mRecvBuffer += recvData;
  1.1368 +        LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
  1.1369 +             is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
  1.1370 +             channel->mStream));
  1.1371 +        return; // Not ready to notify application
  1.1372 +
  1.1373 +      case DATA_CHANNEL_PPID_DOMSTRING_LAST:
  1.1374 +        LOG(("DataChannel: String message received of length %lu on channel %u",
  1.1375 +             length, channel->mStream));
  1.1376 +        if (!channel->mRecvBuffer.IsEmpty()) {
  1.1377 +          channel->mRecvBuffer += recvData;
  1.1378 +          LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
  1.1379 +          channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1.1380 +                                 DataChannelOnMessageAvailable::ON_DATA, this,
  1.1381 +                                 channel, channel->mRecvBuffer, -1));
  1.1382 +          channel->mRecvBuffer.Truncate(0);
  1.1383 +          return;
  1.1384 +        }
  1.1385 +        // else send using recvData normally
  1.1386 +        length = -1; // Flag for DOMString
  1.1387 +
  1.1388 +        // WebSockets checks IsUTF8() here; we can try to deliver it
  1.1389 +        break;
  1.1390 +
  1.1391 +      case DATA_CHANNEL_PPID_BINARY_LAST:
  1.1392 +        LOG(("DataChannel: Received binary message of length %lu on channel id %u",
  1.1393 +             length, channel->mStream));
  1.1394 +        if (!channel->mRecvBuffer.IsEmpty()) {
  1.1395 +          channel->mRecvBuffer += recvData;
  1.1396 +          LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
  1.1397 +          channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1.1398 +                                 DataChannelOnMessageAvailable::ON_DATA, this,
  1.1399 +                                 channel, channel->mRecvBuffer,
  1.1400 +                                 channel->mRecvBuffer.Length()));
  1.1401 +          channel->mRecvBuffer.Truncate(0);
  1.1402 +          return;
  1.1403 +        }
  1.1404 +        // else send using recvData normally
  1.1405 +        break;
  1.1406 +
  1.1407 +      default:
  1.1408 +        NS_ERROR("Unknown data PPID");
  1.1409 +        return;
  1.1410 +    }
  1.1411 +    /* Notify onmessage */
  1.1412 +    LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
  1.1413 +    channel->SendOrQueue(new DataChannelOnMessageAvailable(
  1.1414 +                           DataChannelOnMessageAvailable::ON_DATA, this,
  1.1415 +                           channel, recvData, length));
  1.1416 +  }
  1.1417 +}
  1.1418 +
  1.1419 +// Called with mLock locked!
  1.1420 +void
  1.1421 +DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
  1.1422 +{
  1.1423 +  const struct rtcweb_datachannel_open_request *req;
  1.1424 +  const struct rtcweb_datachannel_ack *ack;
  1.1425 +
  1.1426 +  mLock.AssertCurrentThreadOwns();
  1.1427 +
  1.1428 +  switch (ppid) {
  1.1429 +    case DATA_CHANNEL_PPID_CONTROL:
  1.1430 +      req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
  1.1431 +
  1.1432 +      NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
  1.1433 +      switch (req->msg_type) {
  1.1434 +        case DATA_CHANNEL_OPEN_REQUEST:
  1.1435 +          // structure includes a possibly-unused char label[1] (in a packed structure)
  1.1436 +          NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
  1.1437 +
  1.1438 +          HandleOpenRequestMessage(req, length, stream);
  1.1439 +          break;
  1.1440 +        case DATA_CHANNEL_ACK:
  1.1441 +          // >= sizeof(*ack) checked above
  1.1442 +
  1.1443 +          ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
  1.1444 +          HandleOpenAckMessage(ack, length, stream);
  1.1445 +          break;
  1.1446 +        default:
  1.1447 +          HandleUnknownMessage(ppid, length, stream);
  1.1448 +          break;
  1.1449 +      }
  1.1450 +      break;
  1.1451 +    case DATA_CHANNEL_PPID_DOMSTRING:
  1.1452 +    case DATA_CHANNEL_PPID_DOMSTRING_LAST:
  1.1453 +    case DATA_CHANNEL_PPID_BINARY:
  1.1454 +    case DATA_CHANNEL_PPID_BINARY_LAST:
  1.1455 +      HandleDataMessage(ppid, buffer, length, stream);
  1.1456 +      break;
  1.1457 +    default:
  1.1458 +      LOG(("Message of length %lu, PPID %u on stream %u received.",
  1.1459 +           length, ppid, stream));
  1.1460 +      break;
  1.1461 +  }
  1.1462 +}
  1.1463 +
  1.1464 +void
  1.1465 +DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
  1.1466 +{
  1.1467 +  uint32_t i, n;
  1.1468 +
  1.1469 +  switch (sac->sac_state) {
  1.1470 +  case SCTP_COMM_UP:
  1.1471 +    LOG(("Association change: SCTP_COMM_UP"));
  1.1472 +    if (mState == CONNECTING) {
  1.1473 +      mSocket = mMasterSocket;
  1.1474 +      mState = OPEN;
  1.1475 +
  1.1476 +      SetEvenOdd();
  1.1477 +
  1.1478 +      NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1479 +                                DataChannelOnMessageAvailable::ON_CONNECTION,
  1.1480 +                                this, true));
  1.1481 +      LOG(("DTLS connect() succeeded!  Entering connected mode"));
  1.1482 +
  1.1483 +      // Open any streams pending...
  1.1484 +      ProcessQueuedOpens();
  1.1485 +
  1.1486 +    } else if (mState == OPEN) {
  1.1487 +      LOG(("DataConnection Already OPEN"));
  1.1488 +    } else {
  1.1489 +      LOG(("Unexpected state: %d", mState));
  1.1490 +    }
  1.1491 +    break;
  1.1492 +  case SCTP_COMM_LOST:
  1.1493 +    LOG(("Association change: SCTP_COMM_LOST"));
  1.1494 +    // This association is toast, so also close all the channels -- from mainthread!
  1.1495 +    NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1496 +                              DataChannelOnMessageAvailable::ON_DISCONNECTED,
  1.1497 +                              this));
  1.1498 +    break;
  1.1499 +  case SCTP_RESTART:
  1.1500 +    LOG(("Association change: SCTP_RESTART"));
  1.1501 +    break;
  1.1502 +  case SCTP_SHUTDOWN_COMP:
  1.1503 +    LOG(("Association change: SCTP_SHUTDOWN_COMP"));
  1.1504 +    NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1505 +                              DataChannelOnMessageAvailable::ON_DISCONNECTED,
  1.1506 +                              this));
  1.1507 +    break;
  1.1508 +  case SCTP_CANT_STR_ASSOC:
  1.1509 +    LOG(("Association change: SCTP_CANT_STR_ASSOC"));
  1.1510 +    break;
  1.1511 +  default:
  1.1512 +    LOG(("Association change: UNKNOWN"));
  1.1513 +    break;
  1.1514 +  }
  1.1515 +  LOG(("Association change: streams (in/out) = (%u/%u)",
  1.1516 +       sac->sac_inbound_streams, sac->sac_outbound_streams));
  1.1517 +
  1.1518 +  NS_ENSURE_TRUE_VOID(sac);
  1.1519 +  n = sac->sac_length - sizeof(*sac);
  1.1520 +  if (((sac->sac_state == SCTP_COMM_UP) ||
  1.1521 +        (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
  1.1522 +    for (i = 0; i < n; ++i) {
  1.1523 +      switch (sac->sac_info[i]) {
  1.1524 +      case SCTP_ASSOC_SUPPORTS_PR:
  1.1525 +        LOG(("Supports: PR"));
  1.1526 +        break;
  1.1527 +      case SCTP_ASSOC_SUPPORTS_AUTH:
  1.1528 +        LOG(("Supports: AUTH"));
  1.1529 +        break;
  1.1530 +      case SCTP_ASSOC_SUPPORTS_ASCONF:
  1.1531 +        LOG(("Supports: ASCONF"));
  1.1532 +        break;
  1.1533 +      case SCTP_ASSOC_SUPPORTS_MULTIBUF:
  1.1534 +        LOG(("Supports: MULTIBUF"));
  1.1535 +        break;
  1.1536 +      case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
  1.1537 +        LOG(("Supports: RE-CONFIG"));
  1.1538 +        break;
  1.1539 +      default:
  1.1540 +        LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
  1.1541 +        break;
  1.1542 +      }
  1.1543 +    }
  1.1544 +  } else if (((sac->sac_state == SCTP_COMM_LOST) ||
  1.1545 +              (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
  1.1546 +    LOG(("Association: ABORT ="));
  1.1547 +    for (i = 0; i < n; ++i) {
  1.1548 +      LOG((" 0x%02x", sac->sac_info[i]));
  1.1549 +    }
  1.1550 +  }
  1.1551 +  if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
  1.1552 +      (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
  1.1553 +      (sac->sac_state == SCTP_COMM_LOST)) {
  1.1554 +    return;
  1.1555 +  }
  1.1556 +}
  1.1557 +
  1.1558 +void
  1.1559 +DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
  1.1560 +{
  1.1561 +  char addr_buf[INET6_ADDRSTRLEN];
  1.1562 +  const char *addr = "";
  1.1563 +  struct sockaddr_in *sin;
  1.1564 +  struct sockaddr_in6 *sin6;
  1.1565 +#if defined(__Userspace_os_Windows)
  1.1566 +  DWORD addr_len = INET6_ADDRSTRLEN;
  1.1567 +#endif
  1.1568 +
  1.1569 +  switch (spc->spc_aaddr.ss_family) {
  1.1570 +  case AF_INET:
  1.1571 +    sin = (struct sockaddr_in *)&spc->spc_aaddr;
  1.1572 +#if !defined(__Userspace_os_Windows)
  1.1573 +    addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
  1.1574 +#else
  1.1575 +    if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr,
  1.1576 +                            addr_buf, &addr_len)) {
  1.1577 +      return;
  1.1578 +    }
  1.1579 +#endif
  1.1580 +    break;
  1.1581 +  case AF_INET6:
  1.1582 +    sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
  1.1583 +#if !defined(__Userspace_os_Windows)
  1.1584 +    addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
  1.1585 +#else
  1.1586 +    if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr,
  1.1587 +                            addr_buf, &addr_len)) {
  1.1588 +      return;
  1.1589 +    }
  1.1590 +#endif
  1.1591 +  case AF_CONN:
  1.1592 +    addr = "DTLS connection";
  1.1593 +    break;
  1.1594 +  default:
  1.1595 +    break;
  1.1596 +  }
  1.1597 +  LOG(("Peer address %s is now ", addr));
  1.1598 +  switch (spc->spc_state) {
  1.1599 +  case SCTP_ADDR_AVAILABLE:
  1.1600 +    LOG(("SCTP_ADDR_AVAILABLE"));
  1.1601 +    break;
  1.1602 +  case SCTP_ADDR_UNREACHABLE:
  1.1603 +    LOG(("SCTP_ADDR_UNREACHABLE"));
  1.1604 +    break;
  1.1605 +  case SCTP_ADDR_REMOVED:
  1.1606 +    LOG(("SCTP_ADDR_REMOVED"));
  1.1607 +    break;
  1.1608 +  case SCTP_ADDR_ADDED:
  1.1609 +    LOG(("SCTP_ADDR_ADDED"));
  1.1610 +    break;
  1.1611 +  case SCTP_ADDR_MADE_PRIM:
  1.1612 +    LOG(("SCTP_ADDR_MADE_PRIM"));
  1.1613 +    break;
  1.1614 +  case SCTP_ADDR_CONFIRMED:
  1.1615 +    LOG(("SCTP_ADDR_CONFIRMED"));
  1.1616 +    break;
  1.1617 +  default:
  1.1618 +    LOG(("UNKNOWN"));
  1.1619 +    break;
  1.1620 +  }
  1.1621 +  LOG((" (error = 0x%08x).\n", spc->spc_error));
  1.1622 +}
  1.1623 +
  1.1624 +void
  1.1625 +DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
  1.1626 +{
  1.1627 +  size_t i, n;
  1.1628 +
  1.1629 +  n = sre->sre_length - sizeof(struct sctp_remote_error);
  1.1630 +  LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
  1.1631 +  for (i = 0; i < n; ++i) {
  1.1632 +    LOG((" 0x%02x", sre-> sre_data[i]));
  1.1633 +  }
  1.1634 +}
  1.1635 +
  1.1636 +void
  1.1637 +DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
  1.1638 +{
  1.1639 +  LOG(("Shutdown event."));
  1.1640 +  /* XXX: notify all channels. */
  1.1641 +  // Attempts to actually send anything will fail
  1.1642 +}
  1.1643 +
  1.1644 +void
  1.1645 +DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
  1.1646 +{
  1.1647 +  LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
  1.1648 +}
  1.1649 +
  1.1650 +void
  1.1651 +DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
  1.1652 +{
  1.1653 +  size_t i, n;
  1.1654 +
  1.1655 +  if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
  1.1656 +    LOG(("Unsent "));
  1.1657 +  }
  1.1658 +   if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
  1.1659 +    LOG(("Sent "));
  1.1660 +  }
  1.1661 +  if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
  1.1662 +    LOG(("(flags = %x) ", ssfe->ssfe_flags));
  1.1663 +  }
  1.1664 +  LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
  1.1665 +       ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
  1.1666 +       ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
  1.1667 +  n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
  1.1668 +  for (i = 0; i < n; ++i) {
  1.1669 +    LOG((" 0x%02x", ssfe->ssfe_data[i]));
  1.1670 +  }
  1.1671 +}
  1.1672 +
  1.1673 +void
  1.1674 +DataChannelConnection::ClearResets()
  1.1675 +{
  1.1676 +  // Clear all pending resets
  1.1677 +  if (!mStreamsResetting.IsEmpty()) {
  1.1678 +    LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
  1.1679 +  }
  1.1680 +
  1.1681 +  for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
  1.1682 +    nsRefPtr<DataChannel> channel;
  1.1683 +    channel = FindChannelByStream(mStreamsResetting[i]);
  1.1684 +    if (channel) {
  1.1685 +      LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
  1.1686 +      mStreams[channel->mStream] = nullptr;
  1.1687 +    }
  1.1688 +  }
  1.1689 +  mStreamsResetting.Clear();
  1.1690 +}
  1.1691 +
  1.1692 +void
  1.1693 +DataChannelConnection::ResetOutgoingStream(uint16_t stream)
  1.1694 +{
  1.1695 +  uint32_t i;
  1.1696 +
  1.1697 +  mLock.AssertCurrentThreadOwns();
  1.1698 +  LOG(("Connection %p: Resetting outgoing stream %u",
  1.1699 +       (void *) this, stream));
  1.1700 +  // Rarely has more than a couple items and only for a short time
  1.1701 +  for (i = 0; i < mStreamsResetting.Length(); ++i) {
  1.1702 +    if (mStreamsResetting[i] == stream) {
  1.1703 +      return;
  1.1704 +    }
  1.1705 +  }
  1.1706 +  mStreamsResetting.AppendElement(stream);
  1.1707 +}
  1.1708 +
  1.1709 +void
  1.1710 +DataChannelConnection::SendOutgoingStreamReset()
  1.1711 +{
  1.1712 +  struct sctp_reset_streams *srs;
  1.1713 +  uint32_t i;
  1.1714 +  size_t len;
  1.1715 +
  1.1716 +  LOG(("Connection %p: Sending outgoing stream reset for %d streams",
  1.1717 +       (void *) this, mStreamsResetting.Length()));
  1.1718 +  mLock.AssertCurrentThreadOwns();
  1.1719 +  if (mStreamsResetting.IsEmpty()) {
  1.1720 +    LOG(("No streams to reset"));
  1.1721 +    return;
  1.1722 +  }
  1.1723 +  len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
  1.1724 +  srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
  1.1725 +  memset(srs, 0, len);
  1.1726 +  srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
  1.1727 +  srs->srs_number_streams = mStreamsResetting.Length();
  1.1728 +  for (i = 0; i < mStreamsResetting.Length(); ++i) {
  1.1729 +    srs->srs_stream_list[i] = mStreamsResetting[i];
  1.1730 +  }
  1.1731 +  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
  1.1732 +    LOG(("***failed: setsockopt RESET, errno %d", errno));
  1.1733 +    // if errno == EALREADY, this is normal - we can't send another reset
  1.1734 +    // with one pending.
  1.1735 +    // When we get an incoming reset (which may be a response to our
  1.1736 +    // outstanding one), see if we have any pending outgoing resets and
  1.1737 +    // send them
  1.1738 +  } else {
  1.1739 +    mStreamsResetting.Clear();
  1.1740 +  }
  1.1741 +  moz_free(srs);
  1.1742 +}
  1.1743 +
  1.1744 +void
  1.1745 +DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
  1.1746 +{
  1.1747 +  uint32_t n, i;
  1.1748 +  nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel
  1.1749 +
  1.1750 +  if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
  1.1751 +      !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
  1.1752 +    n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
  1.1753 +    for (i = 0; i < n; ++i) {
  1.1754 +      if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
  1.1755 +        channel = FindChannelByStream(strrst->strreset_stream_list[i]);
  1.1756 +        if (channel) {
  1.1757 +          // The other side closed the channel
  1.1758 +          // We could be in three states:
  1.1759 +          // 1. Normal state (input and output streams (OPEN)
  1.1760 +          //    Notify application, send a RESET in response on our
  1.1761 +          //    outbound channel.  Go to CLOSED
  1.1762 +          // 2. We sent our own reset (CLOSING); either they crossed on the
  1.1763 +          //    wire, or this is a response to our Reset.
  1.1764 +          //    Go to CLOSED
  1.1765 +          // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
  1.1766 +          //    I believe this is impossible, as we don't have an input stream yet.
  1.1767 +
  1.1768 +          LOG(("Incoming: Channel %u  closed, state %d",
  1.1769 +               channel->mStream, channel->mState));
  1.1770 +          ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
  1.1771 +                        channel->mState == DataChannel::CLOSING ||
  1.1772 +                        channel->mState == DataChannel::CONNECTING ||
  1.1773 +                        channel->mState == DataChannel::WAITING_TO_OPEN);
  1.1774 +          if (channel->mState == DataChannel::OPEN ||
  1.1775 +              channel->mState == DataChannel::WAITING_TO_OPEN) {
  1.1776 +            ResetOutgoingStream(channel->mStream);
  1.1777 +            SendOutgoingStreamReset();
  1.1778 +            NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1779 +                                      DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1.1780 +                                      channel));
  1.1781 +          }
  1.1782 +          mStreams[channel->mStream] = nullptr;
  1.1783 +
  1.1784 +          LOG(("Disconnected DataChannel %p from connection %p",
  1.1785 +               (void *) channel.get(), (void *) channel->mConnection.get()));
  1.1786 +          channel->Destroy();
  1.1787 +          // At this point when we leave here, the object is a zombie held alive only by the DOM object
  1.1788 +        } else {
  1.1789 +          LOG(("Can't find incoming channel %d",i));
  1.1790 +        }
  1.1791 +      }
  1.1792 +    }
  1.1793 +  }
  1.1794 +
  1.1795 +  // In case we failed to send a RESET due to having one outstanding, process any pending resets now:
  1.1796 +  if (!mStreamsResetting.IsEmpty()) {
  1.1797 +    LOG(("Sending %d pending resets", mStreamsResetting.Length()));
  1.1798 +    SendOutgoingStreamReset();
  1.1799 +  }
  1.1800 +}
  1.1801 +
  1.1802 +void
  1.1803 +DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
  1.1804 +{
  1.1805 +  uint16_t stream;
  1.1806 +  uint32_t i;
  1.1807 +  nsRefPtr<DataChannel> channel;
  1.1808 +
  1.1809 +  if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
  1.1810 +    LOG(("*** Failed increasing number of streams from %u (%u/%u)",
  1.1811 +         mStreams.Length(),
  1.1812 +         strchg->strchange_instrms,
  1.1813 +         strchg->strchange_outstrms));
  1.1814 +    // XXX FIX! notify pending opens of failure
  1.1815 +    return;
  1.1816 +  } else {
  1.1817 +    if (strchg->strchange_instrms > mStreams.Length()) {
  1.1818 +      LOG(("Other side increased streams from %u to %u",
  1.1819 +           mStreams.Length(), strchg->strchange_instrms));
  1.1820 +    }
  1.1821 +    if (strchg->strchange_outstrms > mStreams.Length() ||
  1.1822 +        strchg->strchange_instrms > mStreams.Length()) {
  1.1823 +      uint16_t old_len = mStreams.Length();
  1.1824 +      uint16_t new_len = std::max(strchg->strchange_outstrms,
  1.1825 +                                  strchg->strchange_instrms);
  1.1826 +      LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
  1.1827 +           old_len, new_len, new_len - old_len,
  1.1828 +           strchg->strchange_instrms));
  1.1829 +      // make sure both are the same length
  1.1830 +      mStreams.AppendElements(new_len - old_len);
  1.1831 +      LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
  1.1832 +      for (size_t i = old_len; i < mStreams.Length(); ++i) {
  1.1833 +        mStreams[i] = nullptr;
  1.1834 +      }
  1.1835 +      // Re-process any channels waiting for streams.
  1.1836 +      // Linear search, but we don't increase channels often and
  1.1837 +      // the array would only get long in case of an app error normally
  1.1838 +
  1.1839 +      // Make sure we request enough streams if there's a big jump in streams
  1.1840 +      // Could make a more complex API for OpenXxxFinish() and avoid this loop
  1.1841 +      int32_t num_needed = mPending.GetSize();
  1.1842 +      LOG(("%d of %d new streams already needed", num_needed,
  1.1843 +           new_len - old_len));
  1.1844 +      num_needed -= (new_len - old_len); // number we added
  1.1845 +      if (num_needed > 0) {
  1.1846 +        if (num_needed < 16)
  1.1847 +          num_needed = 16;
  1.1848 +        LOG(("Not enough new streams, asking for %d more", num_needed));
  1.1849 +        RequestMoreStreams(num_needed);
  1.1850 +      } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
  1.1851 +        LOG(("Requesting %d output streams to match partner",
  1.1852 +             strchg->strchange_instrms - strchg->strchange_outstrms));
  1.1853 +        RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
  1.1854 +      }
  1.1855 +
  1.1856 +      ProcessQueuedOpens();
  1.1857 +    }
  1.1858 +    // else probably not a change in # of streams
  1.1859 +  }
  1.1860 +
  1.1861 +  for (i = 0; i < mStreams.Length(); ++i) {
  1.1862 +    channel = mStreams[i];
  1.1863 +    if (!channel)
  1.1864 +      continue;
  1.1865 +
  1.1866 +    if ((channel->mState == CONNECTING) &&
  1.1867 +        (channel->mStream == INVALID_STREAM)) {
  1.1868 +      if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
  1.1869 +          (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
  1.1870 +        /* XXX: Signal to the other end. */
  1.1871 +        channel->mState = CLOSED;
  1.1872 +        NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.1873 +                                  DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1.1874 +                                  channel));
  1.1875 +        // maybe fire onError (bug 843625)
  1.1876 +      } else {
  1.1877 +        stream = FindFreeStream();
  1.1878 +        if (stream != INVALID_STREAM) {
  1.1879 +          channel->mStream = stream;
  1.1880 +          mStreams[stream] = channel;
  1.1881 +          channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
  1.1882 +          /// XXX fix
  1.1883 +          StartDefer();
  1.1884 +        } else {
  1.1885 +          /* We will not find more ... */
  1.1886 +          break;
  1.1887 +        }
  1.1888 +      }
  1.1889 +    }
  1.1890 +  }
  1.1891 +}
  1.1892 +
  1.1893 +
  1.1894 +// Called with mLock locked!
  1.1895 +void
  1.1896 +DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
  1.1897 +{
  1.1898 +  mLock.AssertCurrentThreadOwns();
  1.1899 +  if (notif->sn_header.sn_length != (uint32_t)n) {
  1.1900 +    return;
  1.1901 +  }
  1.1902 +  switch (notif->sn_header.sn_type) {
  1.1903 +  case SCTP_ASSOC_CHANGE:
  1.1904 +    HandleAssociationChangeEvent(&(notif->sn_assoc_change));
  1.1905 +    break;
  1.1906 +  case SCTP_PEER_ADDR_CHANGE:
  1.1907 +    HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
  1.1908 +    break;
  1.1909 +  case SCTP_REMOTE_ERROR:
  1.1910 +    HandleRemoteErrorEvent(&(notif->sn_remote_error));
  1.1911 +    break;
  1.1912 +  case SCTP_SHUTDOWN_EVENT:
  1.1913 +    HandleShutdownEvent(&(notif->sn_shutdown_event));
  1.1914 +    break;
  1.1915 +  case SCTP_ADAPTATION_INDICATION:
  1.1916 +    HandleAdaptationIndication(&(notif->sn_adaptation_event));
  1.1917 +    break;
  1.1918 +  case SCTP_PARTIAL_DELIVERY_EVENT:
  1.1919 +    LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
  1.1920 +    break;
  1.1921 +  case SCTP_AUTHENTICATION_EVENT:
  1.1922 +    LOG(("SCTP_AUTHENTICATION_EVENT"));
  1.1923 +    break;
  1.1924 +  case SCTP_SENDER_DRY_EVENT:
  1.1925 +    //LOG(("SCTP_SENDER_DRY_EVENT"));
  1.1926 +    break;
  1.1927 +  case SCTP_NOTIFICATIONS_STOPPED_EVENT:
  1.1928 +    LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
  1.1929 +    break;
  1.1930 +  case SCTP_SEND_FAILED_EVENT:
  1.1931 +    HandleSendFailedEvent(&(notif->sn_send_failed_event));
  1.1932 +    break;
  1.1933 +  case SCTP_STREAM_RESET_EVENT:
  1.1934 +    HandleStreamResetEvent(&(notif->sn_strreset_event));
  1.1935 +    break;
  1.1936 +  case SCTP_ASSOC_RESET_EVENT:
  1.1937 +    LOG(("SCTP_ASSOC_RESET_EVENT"));
  1.1938 +    break;
  1.1939 +  case SCTP_STREAM_CHANGE_EVENT:
  1.1940 +    HandleStreamChangeEvent(&(notif->sn_strchange_event));
  1.1941 +    break;
  1.1942 +  default:
  1.1943 +    LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
  1.1944 +    break;
  1.1945 +   }
  1.1946 + }
  1.1947 +
  1.1948 +int
  1.1949 +DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
  1.1950 +                                       struct sctp_rcvinfo rcv, int32_t flags)
  1.1951 +{
  1.1952 +  ASSERT_WEBRTC(!NS_IsMainThread());
  1.1953 +
  1.1954 +  if (!data) {
  1.1955 +    usrsctp_close(sock); // SCTP has finished shutting down
  1.1956 +  } else {
  1.1957 +    MutexAutoLock lock(mLock);
  1.1958 +    if (flags & MSG_NOTIFICATION) {
  1.1959 +      HandleNotification(static_cast<union sctp_notification *>(data), datalen);
  1.1960 +    } else {
  1.1961 +      HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
  1.1962 +    }
  1.1963 +  }
  1.1964 +  // sctp allocates 'data' with malloc(), and expects the receiver to free
  1.1965 +  // it (presumably with free).
  1.1966 +  // XXX future optimization: try to deliver messages without an internal
  1.1967 +  // alloc/copy, and if so delay the free until later.
  1.1968 +  free(data);
  1.1969 +  // usrsctp defines the callback as returning an int, but doesn't use it
  1.1970 +  return 1;
  1.1971 +}
  1.1972 +
  1.1973 +already_AddRefed<DataChannel>
  1.1974 +DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
  1.1975 +                            Type type, bool inOrder,
  1.1976 +                            uint32_t prValue, DataChannelListener *aListener,
  1.1977 +                            nsISupports *aContext, bool aExternalNegotiated,
  1.1978 +                            uint16_t aStream)
  1.1979 +{
  1.1980 +  // aStream == INVALID_STREAM to have the protocol allocate
  1.1981 +  uint16_t prPolicy = SCTP_PR_SCTP_NONE;
  1.1982 +  uint32_t flags;
  1.1983 +
  1.1984 +  LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
  1.1985 +       PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
  1.1986 +       type, inOrder, prValue, aListener, aContext,
  1.1987 +       aExternalNegotiated ? "true" : "false", aStream));
  1.1988 +  switch (type) {
  1.1989 +    case DATA_CHANNEL_RELIABLE:
  1.1990 +      prPolicy = SCTP_PR_SCTP_NONE;
  1.1991 +      break;
  1.1992 +    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
  1.1993 +      prPolicy = SCTP_PR_SCTP_RTX;
  1.1994 +      break;
  1.1995 +    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
  1.1996 +      prPolicy = SCTP_PR_SCTP_TTL;
  1.1997 +      break;
  1.1998 +  }
  1.1999 +  if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
  1.2000 +    return nullptr;
  1.2001 +  }
  1.2002 +
  1.2003 +  // Don't look past currently-negotiated streams
  1.2004 +  if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
  1.2005 +    LOG(("ERROR: external negotiation of already-open channel %u", aStream));
  1.2006 +    // XXX How do we indicate this up to the application?  Probably the
  1.2007 +    // caller's job, but we may need to return an error code.
  1.2008 +    return nullptr;
  1.2009 +  }
  1.2010 +
  1.2011 +  flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
  1.2012 +  nsRefPtr<DataChannel> channel(new DataChannel(this,
  1.2013 +                                                aStream,
  1.2014 +                                                DataChannel::CONNECTING,
  1.2015 +                                                label, protocol,
  1.2016 +                                                type, prValue,
  1.2017 +                                                flags,
  1.2018 +                                                aListener, aContext));
  1.2019 +  if (aExternalNegotiated) {
  1.2020 +    channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
  1.2021 +  }
  1.2022 +
  1.2023 +  MutexAutoLock lock(mLock); // OpenFinish assumes this
  1.2024 +  return OpenFinish(channel.forget());
  1.2025 +}
  1.2026 +
  1.2027 +// Separate routine so we can also call it to finish up from pending opens
  1.2028 +already_AddRefed<DataChannel>
  1.2029 +DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
  1.2030 +{
  1.2031 +  nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in
  1.2032 +  // Normally 1 reference if called from ::Open(), or 2 if called from
  1.2033 +  // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
  1.2034 +  uint16_t stream = channel->mStream;
  1.2035 +  bool queue = false;
  1.2036 +
  1.2037 +  mLock.AssertCurrentThreadOwns();
  1.2038 +
  1.2039 +  // Cases we care about:
  1.2040 +  // Pre-negotiated:
  1.2041 +  //    Not Open:
  1.2042 +  //      Doesn't fit:
  1.2043 +  //         -> change initial ask or renegotiate after open
  1.2044 +  //      -> queue open
  1.2045 +  //    Open:
  1.2046 +  //      Doesn't fit:
  1.2047 +  //         -> RequestMoreStreams && queue
  1.2048 +  //      Does fit:
  1.2049 +  //         -> open
  1.2050 +  // Not negotiated:
  1.2051 +  //    Not Open:
  1.2052 +  //      -> queue open
  1.2053 +  //    Open:
  1.2054 +  //      -> Try to get a stream
  1.2055 +  //      Doesn't fit:
  1.2056 +  //         -> RequestMoreStreams && queue
  1.2057 +  //      Does fit:
  1.2058 +  //         -> open
  1.2059 +  // So the Open cases are basically the same
  1.2060 +  // Not Open cases are simply queue for non-negotiated, and
  1.2061 +  // either change the initial ask or possibly renegotiate after open.
  1.2062 +
  1.2063 +  if (mState == OPEN) {
  1.2064 +    if (stream == INVALID_STREAM) {
  1.2065 +      stream = FindFreeStream(); // may be INVALID_STREAM if we need more
  1.2066 +    }
  1.2067 +    if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
  1.2068 +      // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
  1.2069 +      // to avoid going back immediately for more if the ask to N, N+1, etc
  1.2070 +      int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
  1.2071 +                            (stream-((int32_t)mStreams.Length())) + 16;
  1.2072 +      if (!RequestMoreStreams(more_needed)) {
  1.2073 +        // Something bad happened... we're done
  1.2074 +        goto request_error_cleanup;
  1.2075 +      }
  1.2076 +      queue = true;
  1.2077 +    }
  1.2078 +  } else {
  1.2079 +    // not OPEN
  1.2080 +    if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
  1.2081 +        mState == CLOSED) {
  1.2082 +      // Update number of streams for init message
  1.2083 +      struct sctp_initmsg initmsg;
  1.2084 +      socklen_t len = sizeof(initmsg);
  1.2085 +      int32_t total_needed = stream+16;
  1.2086 +
  1.2087 +      memset(&initmsg, 0, sizeof(initmsg));
  1.2088 +      if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
  1.2089 +        LOG(("*** failed getsockopt SCTP_INITMSG"));
  1.2090 +        goto request_error_cleanup;
  1.2091 +      }
  1.2092 +      LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
  1.2093 +           initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
  1.2094 +      initmsg.sinit_num_ostreams  = total_needed;
  1.2095 +      initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
  1.2096 +      if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
  1.2097 +                             (socklen_t)sizeof(initmsg)) < 0) {
  1.2098 +        LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
  1.2099 +        goto request_error_cleanup;
  1.2100 +      }
  1.2101 +
  1.2102 +      int32_t old_len = mStreams.Length();
  1.2103 +      mStreams.AppendElements(total_needed - old_len);
  1.2104 +      for (int32_t i = old_len; i < total_needed; ++i) {
  1.2105 +        mStreams[i] = nullptr;
  1.2106 +      }
  1.2107 +    }
  1.2108 +    // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
  1.2109 +    // is called, if needed
  1.2110 +    queue = true;
  1.2111 +  }
  1.2112 +  if (queue) {
  1.2113 +    LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
  1.2114 +    // Also serves to mark we told the app
  1.2115 +    channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
  1.2116 +    channel->AddRef(); // we need a ref for the nsDeQue and one to return
  1.2117 +    mPending.Push(channel);
  1.2118 +    return channel.forget();
  1.2119 +  }
  1.2120 +
  1.2121 +  MOZ_ASSERT(stream != INVALID_STREAM);
  1.2122 +  // just allocated (& OPEN), or externally negotiated
  1.2123 +  mStreams[stream] = channel; // holds a reference
  1.2124 +  channel->mStream = stream;
  1.2125 +
  1.2126 +#ifdef TEST_QUEUED_DATA
  1.2127 +  // It's painful to write a test for this...
  1.2128 +  channel->mState = OPEN;
  1.2129 +  channel->mReady = true;
  1.2130 +  SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
  1.2131 +#endif
  1.2132 +
  1.2133 +  if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
  1.2134 +    // Don't send unordered until this gets cleared
  1.2135 +    channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
  1.2136 +  }
  1.2137 +
  1.2138 +  if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
  1.2139 +    if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
  1.2140 +                                stream,
  1.2141 +                                !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
  1.2142 +                                channel->mPrPolicy, channel->mPrValue)) {
  1.2143 +      LOG(("SendOpenRequest failed, errno = %d", errno));
  1.2144 +      if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1.2145 +        channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
  1.2146 +        StartDefer();
  1.2147 +
  1.2148 +        return channel.forget();
  1.2149 +      } else {
  1.2150 +        if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  1.2151 +          // We already returned the channel to the app.
  1.2152 +          NS_ERROR("Failed to send open request");
  1.2153 +          NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.2154 +                                    DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1.2155 +                                    channel));
  1.2156 +        }
  1.2157 +        // If we haven't returned the channel yet, it will get destroyed when we exit
  1.2158 +        // this function.
  1.2159 +        mStreams[stream] = nullptr;
  1.2160 +        channel->mStream = INVALID_STREAM;
  1.2161 +        // we'll be destroying the channel
  1.2162 +        channel->mState = CLOSED;
  1.2163 +        return nullptr;
  1.2164 +      }
  1.2165 +      /* NOTREACHED */
  1.2166 +    }
  1.2167 +  }
  1.2168 +  // Either externally negotiated or we sent Open
  1.2169 +  channel->mState = OPEN;
  1.2170 +  channel->mReady = true;
  1.2171 +  // FIX?  Move into DOMDataChannel?  I don't think we can send it yet here
  1.2172 +  LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
  1.2173 +  NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.2174 +                            DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
  1.2175 +                            channel));
  1.2176 +
  1.2177 +  return channel.forget();
  1.2178 +
  1.2179 +request_error_cleanup:
  1.2180 +  channel->mState = CLOSED;
  1.2181 +  if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
  1.2182 +    // We already returned the channel to the app.
  1.2183 +    NS_ERROR("Failed to request more streams");
  1.2184 +    NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.2185 +                              DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
  1.2186 +                              channel));
  1.2187 +    return channel.forget();
  1.2188 +  }
  1.2189 +  // we'll be destroying the channel, but it never really got set up
  1.2190 +  // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
  1.2191 +  // Dispatch it to ourselves
  1.2192 +  return nullptr;
  1.2193 +}
  1.2194 +
  1.2195 +int32_t
  1.2196 +DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
  1.2197 +                                       uint32_t length, uint32_t ppid)
  1.2198 +{
  1.2199 +  uint16_t flags;
  1.2200 +  struct sctp_sendv_spa spa;
  1.2201 +  int32_t result;
  1.2202 +
  1.2203 +  NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
  1.2204 +  NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
  1.2205 +
  1.2206 +  // To avoid problems where an in-order OPEN is lost and an
  1.2207 +  // out-of-order data message "beats" it, require data to be in-order
  1.2208 +  // until we get an ACK.
  1.2209 +  if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
  1.2210 +      !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
  1.2211 +    flags = SCTP_UNORDERED;
  1.2212 +  } else {
  1.2213 +    flags = 0;
  1.2214 +  }
  1.2215 +
  1.2216 +  spa.sendv_sndinfo.snd_ppid = htonl(ppid);
  1.2217 +  spa.sendv_sndinfo.snd_sid = channel->mStream;
  1.2218 +  spa.sendv_sndinfo.snd_flags = flags;
  1.2219 +  spa.sendv_sndinfo.snd_context = 0;
  1.2220 +  spa.sendv_sndinfo.snd_assoc_id = 0;
  1.2221 +  spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
  1.2222 +
  1.2223 +  if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
  1.2224 +    spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
  1.2225 +    spa.sendv_prinfo.pr_value = channel->mPrValue;
  1.2226 +    spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
  1.2227 +  }
  1.2228 +
  1.2229 +  // Note: Main-thread IO, but doesn't block!
  1.2230 +  // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
  1.2231 +  // (more than the buffersize) queue data onto another thread to do the
  1.2232 +  // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
  1.2233 +
  1.2234 +  // SCTP will return EMSGSIZE if the message is bigger than the buffer
  1.2235 +  // size (or EAGAIN if there isn't space)
  1.2236 +  if (channel->mBufferedData.IsEmpty()) {
  1.2237 +    result = usrsctp_sendv(mSocket, data, length,
  1.2238 +                           nullptr, 0,
  1.2239 +                           (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
  1.2240 +                           SCTP_SENDV_SPA, 0);
  1.2241 +    LOG(("Sent buffer (len=%u), result=%d", length, result));
  1.2242 +  } else {
  1.2243 +    // Fake EAGAIN if we're already buffering data
  1.2244 +    result = -1;
  1.2245 +    errno = EAGAIN;
  1.2246 +  }
  1.2247 +  if (result < 0) {
  1.2248 +    if (errno == EAGAIN || errno == EWOULDBLOCK) {
  1.2249 +      // queue data for resend!  And queue any further data for the stream until it is...
  1.2250 +      BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc
  1.2251 +      channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
  1.2252 +      channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
  1.2253 +      LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
  1.2254 +      StartDefer();
  1.2255 +      return 0;
  1.2256 +    }
  1.2257 +    LOG(("error %d sending string", errno));
  1.2258 +  }
  1.2259 +  return result;
  1.2260 +}
  1.2261 +
  1.2262 +// Handles fragmenting binary messages
  1.2263 +int32_t
  1.2264 +DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
  1.2265 +                                  uint32_t len,
  1.2266 +                                  uint32_t ppid_partial, uint32_t ppid_final)
  1.2267 +{
  1.2268 +  // Since there's a limit on network buffer size and no limits on message
  1.2269 +  // size, and we don't want to use EOR mode (multiple writes for a
  1.2270 +  // message, but all other streams are blocked until you finish sending
  1.2271 +  // this message), we need to add application-level fragmentation of large
  1.2272 +  // messages.  On a reliable channel, these can be simply rebuilt into a
  1.2273 +  // large message.  On an unreliable channel, we can't and don't know how
  1.2274 +  // long to wait, and there are no retransmissions, and no easy way to
  1.2275 +  // tell the user "this part is missing", so on unreliable channels we
  1.2276 +  // need to return an error if sending more bytes than the network buffers
  1.2277 +  // can hold, and perhaps a lower number.
  1.2278 +
  1.2279 +  // We *really* don't want to do this from main thread! - and SendMsgInternal
  1.2280 +  // avoids blocking.
  1.2281 +  // This MUST be reliable and in-order for the reassembly to work
  1.2282 +  if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
  1.2283 +      channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
  1.2284 +      !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
  1.2285 +    int32_t sent=0;
  1.2286 +    uint32_t origlen = len;
  1.2287 +    LOG(("Sending binary message length %u in chunks", len));
  1.2288 +    // XXX check flags for out-of-order, or force in-order for large binary messages
  1.2289 +    while (len > 0) {
  1.2290 +      uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
  1.2291 +      uint32_t ppid;
  1.2292 +      len -= sendlen;
  1.2293 +      ppid = len > 0 ? ppid_partial : ppid_final;
  1.2294 +      LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
  1.2295 +      // Note that these might end up being deferred and queued.
  1.2296 +      sent += SendMsgInternal(channel, data, sendlen, ppid);
  1.2297 +      data += sendlen;
  1.2298 +    }
  1.2299 +    LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
  1.2300 +         (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
  1.2301 +         origlen, sent,
  1.2302 +         channel->mBufferedData.Length()));
  1.2303 +    return sent;
  1.2304 +  }
  1.2305 +  NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
  1.2306 +                   "Sending too-large data on unreliable channel!");
  1.2307 +
  1.2308 +  // This will fail if the message is too large (default 256K)
  1.2309 +  return SendMsgInternal(channel, data, len, ppid_final);
  1.2310 +}
  1.2311 +
  1.2312 +class ReadBlobRunnable : public nsRunnable {
  1.2313 +public:
  1.2314 +  ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
  1.2315 +    nsIInputStream* aBlob) :
  1.2316 +    mConnection(aConnection),
  1.2317 +    mStream(aStream),
  1.2318 +    mBlob(aBlob)
  1.2319 +  { }
  1.2320 +
  1.2321 +  NS_IMETHODIMP Run() {
  1.2322 +    // ReadBlob() is responsible to releasing the reference
  1.2323 +    DataChannelConnection *self = mConnection;
  1.2324 +    self->ReadBlob(mConnection.forget(), mStream, mBlob);
  1.2325 +    return NS_OK;
  1.2326 +  }
  1.2327 +
  1.2328 +private:
  1.2329 +  // Make sure the Connection doesn't die while there are jobs outstanding.
  1.2330 +  // Let it die (if released by PeerConnectionImpl while we're running)
  1.2331 +  // when we send our runnable back to MainThread.  Then ~DataChannelConnection
  1.2332 +  // can send the IOThread to MainThread to die in a runnable, avoiding
  1.2333 +  // unsafe event loop recursion.  Evil.
  1.2334 +  nsRefPtr<DataChannelConnection> mConnection;
  1.2335 +  uint16_t mStream;
  1.2336 +  // Use RefCount for preventing the object is deleted when SendBlob returns.
  1.2337 +  nsRefPtr<nsIInputStream> mBlob;
  1.2338 +};
  1.2339 +
  1.2340 +int32_t
  1.2341 +DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
  1.2342 +{
  1.2343 +  DataChannel *channel = mStreams[stream];
  1.2344 +  NS_ENSURE_TRUE(channel, 0);
  1.2345 +  // Spawn a thread to send the data
  1.2346 +  if (!mInternalIOThread) {
  1.2347 +    nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
  1.2348 +    if (NS_FAILED(res)) {
  1.2349 +      return -1;
  1.2350 +    }
  1.2351 +  }
  1.2352 +
  1.2353 +  nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob);
  1.2354 +  mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL);
  1.2355 +  return 0;
  1.2356 +}
  1.2357 +
  1.2358 +void
  1.2359 +DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
  1.2360 +                                uint16_t aStream, nsIInputStream* aBlob)
  1.2361 +{
  1.2362 +  // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
  1.2363 +  // it off mainthread; if PeerConnectionImpl has released then we want
  1.2364 +  // ~DataChannelConnection() to run on MainThread
  1.2365 +
  1.2366 +  // XXX to do this safely, we must enqueue these atomically onto the
  1.2367 +  // output socket.  We need a sender thread(s?) to enque data into the
  1.2368 +  // socket and to avoid main-thread IO that might block.  Even on a
  1.2369 +  // background thread, we may not want to block on one stream's data.
  1.2370 +  // I.e. run non-blocking and service multiple channels.
  1.2371 +
  1.2372 +  // For now as a hack, send as a single blast of queued packets which may
  1.2373 +  // be deferred until buffer space is available.
  1.2374 +  nsCString temp;
  1.2375 +  uint64_t len;
  1.2376 +  nsCOMPtr<nsIThread> mainThread;
  1.2377 +  NS_GetMainThread(getter_AddRefs(mainThread));
  1.2378 +
  1.2379 +  if (NS_FAILED(aBlob->Available(&len)) ||
  1.2380 +      NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) {
  1.2381 +    // Bug 966602:  Doesn't return an error to the caller via onerror.
  1.2382 +    // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
  1.2383 +    NS_ProxyRelease(mainThread, aThis.take());
  1.2384 +    return;
  1.2385 +  }
  1.2386 +  aBlob->Close();
  1.2387 +  RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis),
  1.2388 +                               &DataChannelConnection::SendBinaryMsg,
  1.2389 +                               aStream, temp),
  1.2390 +                NS_DISPATCH_NORMAL);
  1.2391 +}
  1.2392 +
  1.2393 +void
  1.2394 +DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
  1.2395 +{
  1.2396 +  ASSERT_WEBRTC(NS_IsMainThread());
  1.2397 +  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  1.2398 +    if (mStreams[i]) {
  1.2399 +      aStreamList->push_back(mStreams[i]->mStream);
  1.2400 +    }
  1.2401 +  }
  1.2402 +}
  1.2403 +
  1.2404 +int32_t
  1.2405 +DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
  1.2406 +                                     bool isBinary)
  1.2407 +{
  1.2408 +  ASSERT_WEBRTC(NS_IsMainThread());
  1.2409 +  // We really could allow this from other threads, so long as we deal with
  1.2410 +  // asynchronosity issues with channels closing, in particular access to
  1.2411 +  // mStreams, and issues with the association closing (access to mSocket).
  1.2412 +
  1.2413 +  const char *data = aMsg.BeginReading();
  1.2414 +  uint32_t len     = aMsg.Length();
  1.2415 +  DataChannel *channel;
  1.2416 +
  1.2417 +  LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
  1.2418 +  // XXX if we want more efficiency, translate flags once at open time
  1.2419 +  channel = mStreams[stream];
  1.2420 +  NS_ENSURE_TRUE(channel, 0);
  1.2421 +
  1.2422 +  if (isBinary)
  1.2423 +    return SendBinary(channel, data, len,
  1.2424 +                      DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
  1.2425 +  return SendBinary(channel, data, len,
  1.2426 +                    DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
  1.2427 +}
  1.2428 +
  1.2429 +void
  1.2430 +DataChannelConnection::Close(DataChannel *aChannel)
  1.2431 +{
  1.2432 +  MutexAutoLock lock(mLock);
  1.2433 +  CloseInt(aChannel);
  1.2434 +}
  1.2435 +
  1.2436 +// So we can call Close() with the lock already held
  1.2437 +// Called from someone who holds a ref via ::Close(), or from ~DataChannel
  1.2438 +void
  1.2439 +DataChannelConnection::CloseInt(DataChannel *aChannel)
  1.2440 +{
  1.2441 +  MOZ_ASSERT(aChannel);
  1.2442 +  nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
  1.2443 +
  1.2444 +  mLock.AssertCurrentThreadOwns();
  1.2445 +  LOG(("Connection %p/Channel %p: Closing stream %u",
  1.2446 +       channel->mConnection.get(), channel.get(), channel->mStream));
  1.2447 +  // re-test since it may have closed before the lock was grabbed
  1.2448 +  if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
  1.2449 +    LOG(("Channel already closing/closed (%u)", aChannel->mState));
  1.2450 +    if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
  1.2451 +      // called from CloseAll()
  1.2452 +      // we're not going to hang around waiting any more
  1.2453 +      mStreams[channel->mStream] = nullptr;
  1.2454 +    }
  1.2455 +    return;
  1.2456 +  }
  1.2457 +  aChannel->mBufferedData.Clear();
  1.2458 +  if (channel->mStream != INVALID_STREAM) {
  1.2459 +    ResetOutgoingStream(channel->mStream);
  1.2460 +    if (mState == CLOSED) { // called from CloseAll()
  1.2461 +      // Let resets accumulate then send all at once in CloseAll()
  1.2462 +      // we're not going to hang around waiting
  1.2463 +      mStreams[channel->mStream] = nullptr;
  1.2464 +    } else {
  1.2465 +      SendOutgoingStreamReset();
  1.2466 +    }
  1.2467 +  }
  1.2468 +  aChannel->mState = CLOSING;
  1.2469 +  if (mState == CLOSED) {
  1.2470 +    // we're not going to hang around waiting
  1.2471 +    channel->Destroy();
  1.2472 +  }
  1.2473 +  // At this point when we leave here, the object is a zombie held alive only by the DOM object
  1.2474 +}
  1.2475 +
  1.2476 +void DataChannelConnection::CloseAll()
  1.2477 +{
  1.2478 +  LOG(("Closing all channels (connection %p)", (void*) this));
  1.2479 +  // Don't need to lock here
  1.2480 +
  1.2481 +  // Make sure no more channels will be opened
  1.2482 +  {
  1.2483 +    MutexAutoLock lock(mLock);
  1.2484 +    mState = CLOSED;
  1.2485 +  }
  1.2486 +
  1.2487 +  // Close current channels
  1.2488 +  // If there are runnables, they hold a strong ref and keep the channel
  1.2489 +  // and/or connection alive (even if in a CLOSED state)
  1.2490 +  bool closed_some = false;
  1.2491 +  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
  1.2492 +    if (mStreams[i]) {
  1.2493 +      mStreams[i]->Close();
  1.2494 +      closed_some = true;
  1.2495 +    }
  1.2496 +  }
  1.2497 +
  1.2498 +  // Clean up any pending opens for channels
  1.2499 +  nsRefPtr<DataChannel> channel;
  1.2500 +  while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
  1.2501 +    LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
  1.2502 +    channel->Close(); // also releases the ref on each iteration
  1.2503 +    closed_some = true;
  1.2504 +  }
  1.2505 +  // It's more efficient to let the Resets queue in shutdown and then
  1.2506 +  // SendOutgoingStreamReset() here.
  1.2507 +  if (closed_some) {
  1.2508 +    MutexAutoLock lock(mLock);
  1.2509 +    SendOutgoingStreamReset();
  1.2510 +  }
  1.2511 +}
  1.2512 +
  1.2513 +DataChannel::~DataChannel()
  1.2514 +{
  1.2515 +  // NS_ASSERTION since this is more "I think I caught all the cases that
  1.2516 +  // can cause this" than a true kill-the-program assertion.  If this is
  1.2517 +  // wrong, nothing bad happens.  A worst it's a leak.
  1.2518 +  NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
  1.2519 +}
  1.2520 +
  1.2521 +void
  1.2522 +DataChannel::Close()
  1.2523 +{
  1.2524 +  ENSURE_DATACONNECTION;
  1.2525 +  mConnection->Close(this);
  1.2526 +}
  1.2527 +
  1.2528 +// Used when disconnecting from the DataChannelConnection
  1.2529 +void
  1.2530 +DataChannel::Destroy()
  1.2531 +{
  1.2532 +  ENSURE_DATACONNECTION;
  1.2533 +
  1.2534 +  LOG(("Destroying Data channel %u", mStream));
  1.2535 +  MOZ_ASSERT_IF(mStream != INVALID_STREAM,
  1.2536 +                !mConnection->FindChannelByStream(mStream));
  1.2537 +  mStream = INVALID_STREAM;
  1.2538 +  mState = CLOSED;
  1.2539 +  mConnection = nullptr;
  1.2540 +}
  1.2541 +
  1.2542 +void
  1.2543 +DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
  1.2544 +{
  1.2545 +  MutexAutoLock mLock(mListenerLock);
  1.2546 +  mContext = aContext;
  1.2547 +  mListener = aListener;
  1.2548 +}
  1.2549 +
  1.2550 +// May be called from another (i.e. Main) thread!
  1.2551 +void
  1.2552 +DataChannel::AppReady()
  1.2553 +{
  1.2554 +  ENSURE_DATACONNECTION;
  1.2555 +
  1.2556 +  MutexAutoLock lock(mConnection->mLock);
  1.2557 +
  1.2558 +  mReady = true;
  1.2559 +  if (mState == WAITING_TO_OPEN) {
  1.2560 +    mState = OPEN;
  1.2561 +    NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
  1.2562 +                              DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
  1.2563 +                              this));
  1.2564 +    for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
  1.2565 +      nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
  1.2566 +      MOZ_ASSERT(runnable);
  1.2567 +      NS_DispatchToMainThread(runnable);
  1.2568 +    }
  1.2569 +  } else {
  1.2570 +    NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
  1.2571 +  }
  1.2572 +  mQueuedMessages.Clear();
  1.2573 +  mQueuedMessages.Compact();
  1.2574 +  // We never use it again...  We could even allocate the array in the odd
  1.2575 +  // cases we need it.
  1.2576 +}
  1.2577 +
  1.2578 +uint32_t
  1.2579 +DataChannel::GetBufferedAmount()
  1.2580 +{
  1.2581 +  uint32_t buffered = 0;
  1.2582 +  for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
  1.2583 +    buffered += mBufferedData[i]->mLength;
  1.2584 +  }
  1.2585 +  return buffered;
  1.2586 +}
  1.2587 +
  1.2588 +// Called with mLock locked!
  1.2589 +void
  1.2590 +DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
  1.2591 +{
  1.2592 +  if (!mReady &&
  1.2593 +      (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
  1.2594 +    mQueuedMessages.AppendElement(aMessage);
  1.2595 +  } else {
  1.2596 +    NS_DispatchToMainThread(aMessage);
  1.2597 +  }
  1.2598 +}
  1.2599 +
  1.2600 +} // namespace mozilla

mercurial