netwerk/sctp/datachannel/DataChannel.cpp

Wed, 31 Dec 2014 06:55:46 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:55:46 +0100
changeset 1
ca08bd8f51b2
permissions
-rw-r--r--

Added tag TORBROWSER_REPLICA for changeset 6474c204b198

michael@0 1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
michael@0 2 /* vim: set ts=2 et sw=2 tw=80: */
michael@0 3 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
michael@0 5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 6
michael@0 7 #include <stdio.h>
michael@0 8 #include <stdlib.h>
michael@0 9 #if !defined(__Userspace_os_Windows)
michael@0 10 #include <arpa/inet.h>
michael@0 11 #endif
michael@0 12 // usrsctp.h expects to have errno definitions prior to its inclusion.
michael@0 13 #include <errno.h>
michael@0 14
michael@0 15 #define SCTP_DEBUG 1
michael@0 16 #define SCTP_STDINT_INCLUDE <stdint.h>
michael@0 17
michael@0 18 #ifdef _MSC_VER
michael@0 19 // Disable "warning C4200: nonstandard extension used : zero-sized array in
michael@0 20 // struct/union"
michael@0 21 // ...which the third-party file usrsctp.h runs afoul of.
michael@0 22 #pragma warning(push)
michael@0 23 #pragma warning(disable:4200)
michael@0 24 #endif
michael@0 25
michael@0 26 #include "usrsctp.h"
michael@0 27
michael@0 28 #ifdef _MSC_VER
michael@0 29 #pragma warning(pop)
michael@0 30 #endif
michael@0 31
michael@0 32 #include "DataChannelLog.h"
michael@0 33
michael@0 34 #include "nsServiceManagerUtils.h"
michael@0 35 #include "nsIObserverService.h"
michael@0 36 #include "nsIObserver.h"
michael@0 37 #include "mozilla/Services.h"
michael@0 38 #include "nsProxyRelease.h"
michael@0 39 #include "nsThread.h"
michael@0 40 #include "nsThreadUtils.h"
michael@0 41 #include "nsAutoPtr.h"
michael@0 42 #include "nsNetUtil.h"
michael@0 43 #include "mozilla/StaticPtr.h"
michael@0 44 #ifdef MOZ_PEERCONNECTION
michael@0 45 #include "mtransport/runnable_utils.h"
michael@0 46 #endif
michael@0 47
michael@0 48 #define DATACHANNEL_LOG(args) LOG(args)
michael@0 49 #include "DataChannel.h"
michael@0 50 #include "DataChannelProtocol.h"
michael@0 51
michael@0 52 #ifdef PR_LOGGING
michael@0 53 PRLogModuleInfo*
michael@0 54 GetDataChannelLog()
michael@0 55 {
michael@0 56 static PRLogModuleInfo* sLog;
michael@0 57 if (!sLog)
michael@0 58 sLog = PR_NewLogModule("DataChannel");
michael@0 59 return sLog;
michael@0 60 }
michael@0 61
michael@0 62 PRLogModuleInfo*
michael@0 63 GetSCTPLog()
michael@0 64 {
michael@0 65 static PRLogModuleInfo* sLog;
michael@0 66 if (!sLog)
michael@0 67 sLog = PR_NewLogModule("SCTP");
michael@0 68 return sLog;
michael@0 69 }
michael@0 70 #endif
michael@0 71
michael@0 72 // Let us turn on and off important assertions in non-debug builds
michael@0 73 #ifdef DEBUG
michael@0 74 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
michael@0 75 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
michael@0 76 #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
michael@0 77 #endif
michael@0 78
michael@0 79 static bool sctp_initialized;
michael@0 80
michael@0 81 namespace mozilla {
michael@0 82
michael@0 83 class DataChannelShutdown;
michael@0 84 StaticRefPtr<DataChannelShutdown> gDataChannelShutdown;
michael@0 85
michael@0 86 class DataChannelShutdown : public nsIObserver
michael@0 87 {
michael@0 88 public:
michael@0 89 // This needs to be tied to some form object that is guaranteed to be
michael@0 90 // around (singleton likely) unless we want to shutdown sctp whenever
michael@0 91 // we're not using it (and in which case we'd keep a refcnt'd object
michael@0 92 // ref'd by each DataChannelConnection to release the SCTP usrlib via
michael@0 93 // sctp_finish)
michael@0 94
michael@0 95 NS_DECL_ISUPPORTS
michael@0 96
michael@0 97 DataChannelShutdown() {}
michael@0 98
michael@0 99 void Init()
michael@0 100 {
michael@0 101 nsCOMPtr<nsIObserverService> observerService =
michael@0 102 mozilla::services::GetObserverService();
michael@0 103 if (!observerService)
michael@0 104 return;
michael@0 105
michael@0 106 nsresult rv = observerService->AddObserver(this,
michael@0 107 "profile-change-net-teardown",
michael@0 108 false);
michael@0 109 MOZ_ASSERT(rv == NS_OK);
michael@0 110 (void) rv;
michael@0 111 }
michael@0 112
michael@0 113 virtual ~DataChannelShutdown()
michael@0 114 {
michael@0 115 nsCOMPtr<nsIObserverService> observerService =
michael@0 116 mozilla::services::GetObserverService();
michael@0 117 if (observerService)
michael@0 118 observerService->RemoveObserver(this, "profile-change-net-teardown");
michael@0 119 }
michael@0 120
michael@0 121 NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic,
michael@0 122 const char16_t* aData) {
michael@0 123 if (strcmp(aTopic, "profile-change-net-teardown") == 0) {
michael@0 124 LOG(("Shutting down SCTP"));
michael@0 125 if (sctp_initialized) {
michael@0 126 usrsctp_finish();
michael@0 127 sctp_initialized = false;
michael@0 128 }
michael@0 129 nsCOMPtr<nsIObserverService> observerService =
michael@0 130 mozilla::services::GetObserverService();
michael@0 131 if (!observerService)
michael@0 132 return NS_ERROR_FAILURE;
michael@0 133
michael@0 134 nsresult rv = observerService->RemoveObserver(this,
michael@0 135 "profile-change-net-teardown");
michael@0 136 MOZ_ASSERT(rv == NS_OK);
michael@0 137 (void) rv;
michael@0 138
michael@0 139 nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this);
michael@0 140 gDataChannelShutdown = nullptr;
michael@0 141 }
michael@0 142 return NS_OK;
michael@0 143 }
michael@0 144 };
michael@0 145
michael@0 146 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
michael@0 147
michael@0 148
michael@0 149 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
michael@0 150 uint32_t length) : mLength(length)
michael@0 151 {
michael@0 152 mSpa = new sctp_sendv_spa;
michael@0 153 *mSpa = spa;
michael@0 154 char *tmp = new char[length]; // infallible malloc!
michael@0 155 memcpy(tmp, data, length);
michael@0 156 mData = tmp;
michael@0 157 }
michael@0 158
michael@0 159 BufferedMsg::~BufferedMsg()
michael@0 160 {
michael@0 161 delete mSpa;
michael@0 162 delete mData;
michael@0 163 }
michael@0 164
michael@0 165 static int
michael@0 166 receive_cb(struct socket* sock, union sctp_sockstore addr,
michael@0 167 void *data, size_t datalen,
michael@0 168 struct sctp_rcvinfo rcv, int flags, void *ulp_info)
michael@0 169 {
michael@0 170 DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
michael@0 171 return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
michael@0 172 }
michael@0 173
michael@0 174 #ifdef PR_LOGGING
michael@0 175 static void
michael@0 176 debug_printf(const char *format, ...)
michael@0 177 {
michael@0 178 va_list ap;
michael@0 179 char buffer[1024];
michael@0 180
michael@0 181 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
michael@0 182 va_start(ap, format);
michael@0 183 #ifdef _WIN32
michael@0 184 if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
michael@0 185 #else
michael@0 186 if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) {
michael@0 187 #endif
michael@0 188 PR_LogPrint("%s", buffer);
michael@0 189 }
michael@0 190 va_end(ap);
michael@0 191 }
michael@0 192 }
michael@0 193 #endif
michael@0 194
michael@0 195 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
michael@0 196 mLock("netwerk::sctp::DataChannelConnection")
michael@0 197 {
michael@0 198 mState = CLOSED;
michael@0 199 mSocket = nullptr;
michael@0 200 mMasterSocket = nullptr;
michael@0 201 mListener = listener->asWeakPtr();
michael@0 202 mLocalPort = 0;
michael@0 203 mRemotePort = 0;
michael@0 204 mDeferTimeout = 10;
michael@0 205 mTimerRunning = false;
michael@0 206 LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
michael@0 207 mInternalIOThread = nullptr;
michael@0 208 }
michael@0 209
michael@0 210 DataChannelConnection::~DataChannelConnection()
michael@0 211 {
michael@0 212 LOG(("Deleting DataChannelConnection %p", (void *) this));
michael@0 213 // This may die on the MainThread, or on the STS thread
michael@0 214 ASSERT_WEBRTC(mState == CLOSED);
michael@0 215 MOZ_ASSERT(!mMasterSocket);
michael@0 216 MOZ_ASSERT(mPending.GetSize() == 0);
michael@0 217
michael@0 218 // Already disconnected from sigslot/mTransportFlow
michael@0 219 // TransportFlows must be released from the STS thread
michael@0 220 if (!IsSTSThread()) {
michael@0 221 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 222 if (mTransportFlow) {
michael@0 223 ASSERT_WEBRTC(mSTS);
michael@0 224 NS_ProxyRelease(mSTS, mTransportFlow);
michael@0 225 }
michael@0 226
michael@0 227 if (mInternalIOThread) {
michael@0 228 // Avoid spinning the event thread from here (which if we're mainthread
michael@0 229 // is in the event loop already)
michael@0 230 NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
michael@0 231 &nsIThread::Shutdown),
michael@0 232 NS_DISPATCH_NORMAL);
michael@0 233 }
michael@0 234 } else {
michael@0 235 // on STS, safe to call shutdown
michael@0 236 if (mInternalIOThread) {
michael@0 237 mInternalIOThread->Shutdown();
michael@0 238 }
michael@0 239 }
michael@0 240 }
michael@0 241
michael@0 242 void
michael@0 243 DataChannelConnection::Destroy()
michael@0 244 {
michael@0 245 // Though it's probably ok to do this and close the sockets;
michael@0 246 // if we really want it to do true clean shutdowns it can
michael@0 247 // create a dependant Internal object that would remain around
michael@0 248 // until the network shut down the association or timed out.
michael@0 249 LOG(("Destroying DataChannelConnection %p", (void *) this));
michael@0 250 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 251 CloseAll();
michael@0 252
michael@0 253 MutexAutoLock lock(mLock);
michael@0 254 // If we had a pending reset, we aren't waiting for it - clear the list so
michael@0 255 // we can deregister this DataChannelConnection without leaking.
michael@0 256 ClearResets();
michael@0 257
michael@0 258 MOZ_ASSERT(mSTS);
michael@0 259 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 260 // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
michael@0 261 // the usrsctp_close() calls can move back here (and just proxy the
michael@0 262 // disconnect_all())
michael@0 263 RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
michael@0 264 &DataChannelConnection::DestroyOnSTS,
michael@0 265 mSocket, mMasterSocket),
michael@0 266 NS_DISPATCH_NORMAL);
michael@0 267
michael@0 268 // These will be released on STS
michael@0 269 mSocket = nullptr;
michael@0 270 mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
michael@0 271
michael@0 272 // Must do this in Destroy() since we may then delete this object
michael@0 273 if (mUsingDtls) {
michael@0 274 usrsctp_deregister_address(static_cast<void *>(this));
michael@0 275 LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
michael@0 276 }
michael@0 277
michael@0 278 // We can't get any more new callbacks from the SCTP library
michael@0 279 // All existing callbacks have refs to DataChannelConnection
michael@0 280
michael@0 281 // nsDOMDataChannel objects have refs to DataChannels that have refs to us
michael@0 282 }
michael@0 283
michael@0 284 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
michael@0 285 struct socket *aSocket)
michael@0 286 {
michael@0 287 if (aSocket && aSocket != aMasterSocket)
michael@0 288 usrsctp_close(aSocket);
michael@0 289 if (aMasterSocket)
michael@0 290 usrsctp_close(aMasterSocket);
michael@0 291
michael@0 292 disconnect_all();
michael@0 293 }
michael@0 294
michael@0 295 NS_IMPL_ISUPPORTS(DataChannelConnection,
michael@0 296 nsITimerCallback)
michael@0 297
michael@0 298 bool
michael@0 299 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
michael@0 300 {
michael@0 301 struct sctp_initmsg initmsg;
michael@0 302 struct sctp_udpencaps encaps;
michael@0 303 struct sctp_assoc_value av;
michael@0 304 struct sctp_event event;
michael@0 305 socklen_t len;
michael@0 306
michael@0 307 uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
michael@0 308 SCTP_PEER_ADDR_CHANGE,
michael@0 309 SCTP_REMOTE_ERROR,
michael@0 310 SCTP_SHUTDOWN_EVENT,
michael@0 311 SCTP_ADAPTATION_INDICATION,
michael@0 312 SCTP_SEND_FAILED_EVENT,
michael@0 313 SCTP_STREAM_RESET_EVENT,
michael@0 314 SCTP_STREAM_CHANGE_EVENT};
michael@0 315 {
michael@0 316 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 317
michael@0 318 // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
michael@0 319 if (!sctp_initialized) {
michael@0 320 if (aUsingDtls) {
michael@0 321 LOG(("sctp_init(DTLS)"));
michael@0 322 #ifdef MOZ_PEERCONNECTION
michael@0 323 usrsctp_init(0,
michael@0 324 DataChannelConnection::SctpDtlsOutput,
michael@0 325 #ifdef PR_LOGGING
michael@0 326 debug_printf
michael@0 327 #else
michael@0 328 nullptr
michael@0 329 #endif
michael@0 330 );
michael@0 331 #else
michael@0 332 NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
michael@0 333 #endif
michael@0 334 } else {
michael@0 335 LOG(("sctp_init(%u)", aPort));
michael@0 336 usrsctp_init(aPort,
michael@0 337 nullptr,
michael@0 338 #ifdef PR_LOGGING
michael@0 339 debug_printf
michael@0 340 #else
michael@0 341 nullptr
michael@0 342 #endif
michael@0 343 );
michael@0 344 }
michael@0 345
michael@0 346 #ifdef PR_LOGGING
michael@0 347 // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs
michael@0 348 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
michael@0 349 usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
michael@0 350 }
michael@0 351 #endif
michael@0 352 usrsctp_sysctl_set_sctp_blackhole(2);
michael@0 353 // ECN is currently not supported by the Firefox code
michael@0 354 usrsctp_sysctl_set_sctp_ecn_enable(0);
michael@0 355 sctp_initialized = true;
michael@0 356
michael@0 357 gDataChannelShutdown = new DataChannelShutdown();
michael@0 358 gDataChannelShutdown->Init();
michael@0 359 }
michael@0 360 }
michael@0 361
michael@0 362 // XXX FIX! make this a global we get once
michael@0 363 // Find the STS thread
michael@0 364 nsresult rv;
michael@0 365 mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
michael@0 366 MOZ_ASSERT(NS_SUCCEEDED(rv));
michael@0 367
michael@0 368 // Open sctp with a callback
michael@0 369 if ((mMasterSocket = usrsctp_socket(
michael@0 370 aUsingDtls ? AF_CONN : AF_INET,
michael@0 371 SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
michael@0 372 return false;
michael@0 373 }
michael@0 374
michael@0 375 // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
michael@0 376 // in associations for normal IO
michael@0 377 if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
michael@0 378 LOG(("Couldn't set non_blocking on SCTP socket"));
michael@0 379 // We can't handle connect() safely if it will block, not that this will
michael@0 380 // even happen.
michael@0 381 goto error_cleanup;
michael@0 382 }
michael@0 383
michael@0 384 // Make sure when we close the socket, make sure it doesn't call us back again!
michael@0 385 // This would cause it try to use an invalid DataChannelConnection pointer
michael@0 386 struct linger l;
michael@0 387 l.l_onoff = 1;
michael@0 388 l.l_linger = 0;
michael@0 389 if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
michael@0 390 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
michael@0 391 LOG(("Couldn't set SO_LINGER on SCTP socket"));
michael@0 392 // unsafe to allow it to continue if this fails
michael@0 393 goto error_cleanup;
michael@0 394 }
michael@0 395
michael@0 396 // XXX Consider disabling this when we add proper SDP negotiation.
michael@0 397 // We may want to leave enabled for supporting 'cloning' of SDP offers, which
michael@0 398 // implies re-use of the same pseudo-port number, or forcing a renegotiation.
michael@0 399 {
michael@0 400 uint32_t on = 1;
michael@0 401 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
michael@0 402 (const void *)&on, (socklen_t)sizeof(on)) < 0) {
michael@0 403 LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
michael@0 404 }
michael@0 405 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
michael@0 406 (const void *)&on, (socklen_t)sizeof(on)) < 0) {
michael@0 407 LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
michael@0 408 }
michael@0 409 }
michael@0 410
michael@0 411 if (!aUsingDtls) {
michael@0 412 memset(&encaps, 0, sizeof(encaps));
michael@0 413 encaps.sue_address.ss_family = AF_INET;
michael@0 414 encaps.sue_port = htons(aPort);
michael@0 415 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
michael@0 416 (const void*)&encaps,
michael@0 417 (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
michael@0 418 LOG(("*** failed encaps errno %d", errno));
michael@0 419 goto error_cleanup;
michael@0 420 }
michael@0 421 LOG(("SCTP encapsulation local port %d", aPort));
michael@0 422 }
michael@0 423
michael@0 424 av.assoc_id = SCTP_ALL_ASSOC;
michael@0 425 av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
michael@0 426 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
michael@0 427 (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
michael@0 428 LOG(("*** failed enable stream reset errno %d", errno));
michael@0 429 goto error_cleanup;
michael@0 430 }
michael@0 431
michael@0 432 /* Enable the events of interest. */
michael@0 433 memset(&event, 0, sizeof(event));
michael@0 434 event.se_assoc_id = SCTP_ALL_ASSOC;
michael@0 435 event.se_on = 1;
michael@0 436 for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) {
michael@0 437 event.se_type = event_types[i];
michael@0 438 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
michael@0 439 LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
michael@0 440 goto error_cleanup;
michael@0 441 }
michael@0 442 }
michael@0 443
michael@0 444 // Update number of streams
michael@0 445 mStreams.AppendElements(aNumStreams);
michael@0 446 for (uint32_t i = 0; i < aNumStreams; ++i) {
michael@0 447 mStreams[i] = nullptr;
michael@0 448 }
michael@0 449 memset(&initmsg, 0, sizeof(initmsg));
michael@0 450 len = sizeof(initmsg);
michael@0 451 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
michael@0 452 LOG(("*** failed getsockopt SCTP_INITMSG"));
michael@0 453 goto error_cleanup;
michael@0 454 }
michael@0 455 LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
michael@0 456 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
michael@0 457 initmsg.sinit_num_ostreams = aNumStreams;
michael@0 458 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
michael@0 459 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
michael@0 460 (socklen_t)sizeof(initmsg)) < 0) {
michael@0 461 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
michael@0 462 goto error_cleanup;
michael@0 463 }
michael@0 464
michael@0 465 mSocket = nullptr;
michael@0 466 if (aUsingDtls) {
michael@0 467 mUsingDtls = true;
michael@0 468 usrsctp_register_address(static_cast<void *>(this));
michael@0 469 LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
michael@0 470 } else {
michael@0 471 mUsingDtls = false;
michael@0 472 }
michael@0 473 return true;
michael@0 474
michael@0 475 error_cleanup:
michael@0 476 usrsctp_close(mMasterSocket);
michael@0 477 mMasterSocket = nullptr;
michael@0 478 mUsingDtls = false;
michael@0 479 return false;
michael@0 480 }
michael@0 481
michael@0 482 void
michael@0 483 DataChannelConnection::StartDefer()
michael@0 484 {
michael@0 485 nsresult rv;
michael@0 486 if (!NS_IsMainThread()) {
michael@0 487 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 488 DataChannelOnMessageAvailable::START_DEFER,
michael@0 489 this, (DataChannel *) nullptr));
michael@0 490 return;
michael@0 491 }
michael@0 492
michael@0 493 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 494 if (!mDeferredTimer) {
michael@0 495 mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
michael@0 496 MOZ_ASSERT(mDeferredTimer);
michael@0 497 }
michael@0 498
michael@0 499 if (!mTimerRunning) {
michael@0 500 rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
michael@0 501 nsITimer::TYPE_ONE_SHOT);
michael@0 502 NS_ENSURE_TRUE_VOID(rv == NS_OK);
michael@0 503
michael@0 504 mTimerRunning = true;
michael@0 505 }
michael@0 506 }
michael@0 507
michael@0 508 // nsITimerCallback
michael@0 509
michael@0 510 NS_IMETHODIMP
michael@0 511 DataChannelConnection::Notify(nsITimer *timer)
michael@0 512 {
michael@0 513 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 514 LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout));
michael@0 515
michael@0 516 if (timer == mDeferredTimer) {
michael@0 517 if (SendDeferredMessages()) {
michael@0 518 // Still blocked
michael@0 519 // we don't need a lock, since this must be main thread...
michael@0 520 nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
michael@0 521 nsITimer::TYPE_ONE_SHOT);
michael@0 522 if (NS_FAILED(rv)) {
michael@0 523 LOG(("%s: cannot initialize open timer", __FUNCTION__));
michael@0 524 // XXX and do....?
michael@0 525 return rv;
michael@0 526 }
michael@0 527 mTimerRunning = true;
michael@0 528 } else {
michael@0 529 LOG(("Turned off deferred send timer"));
michael@0 530 mTimerRunning = false;
michael@0 531 }
michael@0 532 }
michael@0 533 return NS_OK;
michael@0 534 }
michael@0 535
michael@0 536 #ifdef MOZ_PEERCONNECTION
michael@0 537 void
michael@0 538 DataChannelConnection::SetEvenOdd()
michael@0 539 {
michael@0 540 ASSERT_WEBRTC(IsSTSThread());
michael@0 541
michael@0 542 TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
michael@0 543 mTransportFlow->GetLayer(TransportLayerDtls::ID()));
michael@0 544 MOZ_ASSERT(dtls); // DTLS is mandatory
michael@0 545 mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
michael@0 546 }
michael@0 547
michael@0 548 bool
michael@0 549 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
michael@0 550 {
michael@0 551 LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
michael@0 552
michael@0 553 NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
michael@0 554 NS_ENSURE_TRUE(aFlow, false);
michael@0 555
michael@0 556 mTransportFlow = aFlow;
michael@0 557 mLocalPort = localport;
michael@0 558 mRemotePort = remoteport;
michael@0 559 mState = CONNECTING;
michael@0 560
michael@0 561 RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
michael@0 562 &DataChannelConnection::SetSignals),
michael@0 563 NS_DISPATCH_NORMAL);
michael@0 564 return true;
michael@0 565 }
michael@0 566
michael@0 567 void
michael@0 568 DataChannelConnection::SetSignals()
michael@0 569 {
michael@0 570 ASSERT_WEBRTC(IsSTSThread());
michael@0 571 ASSERT_WEBRTC(mTransportFlow);
michael@0 572 LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
michael@0 573 mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
michael@0 574 // SignalStateChange() doesn't call you with the initial state
michael@0 575 mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
michael@0 576 CompleteConnect(mTransportFlow, mTransportFlow->state());
michael@0 577 }
michael@0 578
michael@0 579 void
michael@0 580 DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
michael@0 581 {
michael@0 582 LOG(("Data transport state: %d", state));
michael@0 583 MutexAutoLock lock(mLock);
michael@0 584 ASSERT_WEBRTC(IsSTSThread());
michael@0 585 // We should abort connection on TS_ERROR.
michael@0 586 // Note however that the association will also fail (perhaps with a delay) and
michael@0 587 // notify us in that way
michael@0 588 if (state != TransportLayer::TS_OPEN || !mMasterSocket)
michael@0 589 return;
michael@0 590
michael@0 591 struct sockaddr_conn addr;
michael@0 592 memset(&addr, 0, sizeof(addr));
michael@0 593 addr.sconn_family = AF_CONN;
michael@0 594 #if defined(__Userspace_os_Darwin)
michael@0 595 addr.sconn_len = sizeof(addr);
michael@0 596 #endif
michael@0 597 addr.sconn_port = htons(mLocalPort);
michael@0 598 addr.sconn_addr = static_cast<void *>(this);
michael@0 599
michael@0 600 LOG(("Calling usrsctp_bind"));
michael@0 601 int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
michael@0 602 sizeof(addr));
michael@0 603 if (r < 0) {
michael@0 604 LOG(("usrsctp_bind failed: %d", r));
michael@0 605 } else {
michael@0 606 // This is the remote addr
michael@0 607 addr.sconn_port = htons(mRemotePort);
michael@0 608 LOG(("Calling usrsctp_connect"));
michael@0 609 r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
michael@0 610 sizeof(addr));
michael@0 611 if (r < 0) {
michael@0 612 if (errno == EINPROGRESS) {
michael@0 613 // non-blocking
michael@0 614 return;
michael@0 615 } else {
michael@0 616 LOG(("usrsctp_connect failed: %d", errno));
michael@0 617 mState = CLOSED;
michael@0 618 }
michael@0 619 } else {
michael@0 620 // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
michael@0 621 // This also avoids issues with calling TransportFlow stuff on Mainthread
michael@0 622 return;
michael@0 623 }
michael@0 624 }
michael@0 625 // Note: currently this doesn't actually notify the application
michael@0 626 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 627 DataChannelOnMessageAvailable::ON_CONNECTION,
michael@0 628 this, false));
michael@0 629 return;
michael@0 630 }
michael@0 631
michael@0 632 // Process any pending Opens
michael@0 633 void
michael@0 634 DataChannelConnection::ProcessQueuedOpens()
michael@0 635 {
michael@0 636 // The nsDeque holds channels with an AddRef applied. Another reference
michael@0 637 // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
michael@0 638 // references should exist.
michael@0 639
michael@0 640 // Can't copy nsDeque's. Move into temp array since any that fail will
michael@0 641 // go back to mPending
michael@0 642 nsDeque temp;
michael@0 643 DataChannel *temp_channel; // really already_AddRefed<>
michael@0 644 while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
michael@0 645 temp.Push(static_cast<void *>(temp_channel));
michael@0 646 }
michael@0 647
michael@0 648 nsRefPtr<DataChannel> channel;
michael@0 649 // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
michael@0 650 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
michael@0 651 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
michael@0 652 LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
michael@0 653 channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
michael@0 654 // OpenFinish returns a reference itself, so we need to take it can Release it
michael@0 655 channel = OpenFinish(channel.forget()); // may reset the flag and re-push
michael@0 656 } else {
michael@0 657 NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
michael@0 658 }
michael@0 659 }
michael@0 660
michael@0 661 }
michael@0 662 void
michael@0 663 DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
michael@0 664 const unsigned char *data, size_t len)
michael@0 665 {
michael@0 666 #ifdef PR_LOGGING
michael@0 667 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
michael@0 668 char *buf;
michael@0 669
michael@0 670 if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
michael@0 671 PR_LogPrint("%s", buf);
michael@0 672 usrsctp_freedumpbuffer(buf);
michael@0 673 }
michael@0 674 }
michael@0 675 #endif
michael@0 676 // Pass the data to SCTP
michael@0 677 usrsctp_conninput(static_cast<void *>(this), data, len, 0);
michael@0 678 }
michael@0 679
michael@0 680 int
michael@0 681 DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release)
michael@0 682 {
michael@0 683 //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
michael@0 684 int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
michael@0 685 if (release)
michael@0 686 delete data;
michael@0 687 return res;
michael@0 688 }
michael@0 689
michael@0 690 /* static */
michael@0 691 int
michael@0 692 DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
michael@0 693 uint8_t tos, uint8_t set_df)
michael@0 694 {
michael@0 695 DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
michael@0 696 int res;
michael@0 697
michael@0 698 #ifdef PR_LOGGING
michael@0 699 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
michael@0 700 char *buf;
michael@0 701
michael@0 702 if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
michael@0 703 PR_LogPrint("%s", buf);
michael@0 704 usrsctp_freedumpbuffer(buf);
michael@0 705 }
michael@0 706 }
michael@0 707 #endif
michael@0 708 // We're async proxying even if on the STSThread because this is called
michael@0 709 // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
michael@0 710 // SCTP has an option for Apple, on IP connections only, to release at least
michael@0 711 // one of the locks before calling a packet output routine; with changes to
michael@0 712 // the underlying SCTP stack this might remove the need to use an async proxy.
michael@0 713 if (0 /*peer->IsSTSThread()*/) {
michael@0 714 res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
michael@0 715 } else {
michael@0 716 unsigned char *data = new unsigned char[length];
michael@0 717 memcpy(data, buffer, length);
michael@0 718 res = -1;
michael@0 719 // XXX It might be worthwhile to add an assertion against the thread
michael@0 720 // somehow getting into the DataChannel/SCTP code again, as
michael@0 721 // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
michael@0 722 // needs to be a per-thread check, not a global.
michael@0 723 peer->mSTS->Dispatch(WrapRunnable(
michael@0 724 nsRefPtr<DataChannelConnection>(peer),
michael@0 725 &DataChannelConnection::SendPacket, data, length, true),
michael@0 726 NS_DISPATCH_NORMAL);
michael@0 727 res = 0; // cheat! Packets can always be dropped later anyways
michael@0 728 }
michael@0 729 return res;
michael@0 730 }
michael@0 731 #endif
michael@0 732
michael@0 733 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
michael@0 734 // listen for incoming associations
michael@0 735 // Blocks! - Don't call this from main thread!
michael@0 736
michael@0 737 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
michael@0 738
michael@0 739 bool
michael@0 740 DataChannelConnection::Listen(unsigned short port)
michael@0 741 {
michael@0 742 struct sockaddr_in addr;
michael@0 743 socklen_t addr_len;
michael@0 744
michael@0 745 NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
michael@0 746
michael@0 747 /* Acting as the 'server' */
michael@0 748 memset((void *)&addr, 0, sizeof(addr));
michael@0 749 #ifdef HAVE_SIN_LEN
michael@0 750 addr.sin_len = sizeof(struct sockaddr_in);
michael@0 751 #endif
michael@0 752 addr.sin_family = AF_INET;
michael@0 753 addr.sin_port = htons(port);
michael@0 754 addr.sin_addr.s_addr = htonl(INADDR_ANY);
michael@0 755 LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
michael@0 756 mState = CONNECTING;
michael@0 757 if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
michael@0 758 LOG(("***Failed userspace_bind"));
michael@0 759 return false;
michael@0 760 }
michael@0 761 if (usrsctp_listen(mMasterSocket, 1) < 0) {
michael@0 762 LOG(("***Failed userspace_listen"));
michael@0 763 return false;
michael@0 764 }
michael@0 765
michael@0 766 LOG(("Accepting connection"));
michael@0 767 addr_len = 0;
michael@0 768 if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
michael@0 769 LOG(("***Failed accept"));
michael@0 770 return false;
michael@0 771 }
michael@0 772 mState = OPEN;
michael@0 773
michael@0 774 struct linger l;
michael@0 775 l.l_onoff = 1;
michael@0 776 l.l_linger = 0;
michael@0 777 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
michael@0 778 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
michael@0 779 LOG(("Couldn't set SO_LINGER on SCTP socket"));
michael@0 780 }
michael@0 781
michael@0 782 SetEvenOdd();
michael@0 783
michael@0 784 // Notify Connection open
michael@0 785 // XXX We need to make sure connection sticks around until the message is delivered
michael@0 786 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
michael@0 787 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 788 DataChannelOnMessageAvailable::ON_CONNECTION,
michael@0 789 this, (DataChannel *) nullptr));
michael@0 790 return true;
michael@0 791 }
michael@0 792
michael@0 793 // Blocks! - Don't call this from main thread!
michael@0 794 bool
michael@0 795 DataChannelConnection::Connect(const char *addr, unsigned short port)
michael@0 796 {
michael@0 797 struct sockaddr_in addr4;
michael@0 798 struct sockaddr_in6 addr6;
michael@0 799
michael@0 800 NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
michael@0 801
michael@0 802 /* Acting as the connector */
michael@0 803 LOG(("Connecting to %s, port %u", addr, port));
michael@0 804 memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
michael@0 805 memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
michael@0 806 #ifdef HAVE_SIN_LEN
michael@0 807 addr4.sin_len = sizeof(struct sockaddr_in);
michael@0 808 #endif
michael@0 809 #ifdef HAVE_SIN6_LEN
michael@0 810 addr6.sin6_len = sizeof(struct sockaddr_in6);
michael@0 811 #endif
michael@0 812 addr4.sin_family = AF_INET;
michael@0 813 addr6.sin6_family = AF_INET6;
michael@0 814 addr4.sin_port = htons(port);
michael@0 815 addr6.sin6_port = htons(port);
michael@0 816 mState = CONNECTING;
michael@0 817
michael@0 818 #if !defined(__Userspace_os_Windows)
michael@0 819 if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
michael@0 820 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
michael@0 821 LOG(("*** Failed userspace_connect"));
michael@0 822 return false;
michael@0 823 }
michael@0 824 } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
michael@0 825 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
michael@0 826 LOG(("*** Failed userspace_connect"));
michael@0 827 return false;
michael@0 828 }
michael@0 829 } else {
michael@0 830 LOG(("*** Illegal destination address."));
michael@0 831 }
michael@0 832 #else
michael@0 833 {
michael@0 834 struct sockaddr_storage ss;
michael@0 835 int sslen = sizeof(ss);
michael@0 836
michael@0 837 if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
michael@0 838 addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
michael@0 839 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
michael@0 840 LOG(("*** Failed userspace_connect"));
michael@0 841 return false;
michael@0 842 }
michael@0 843 } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
michael@0 844 addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
michael@0 845 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
michael@0 846 LOG(("*** Failed userspace_connect"));
michael@0 847 return false;
michael@0 848 }
michael@0 849 } else {
michael@0 850 LOG(("*** Illegal destination address."));
michael@0 851 }
michael@0 852 }
michael@0 853 #endif
michael@0 854
michael@0 855 mSocket = mMasterSocket;
michael@0 856
michael@0 857 LOG(("connect() succeeded! Entering connected mode"));
michael@0 858 mState = OPEN;
michael@0 859
michael@0 860 SetEvenOdd();
michael@0 861
michael@0 862 // Notify Connection open
michael@0 863 // XXX We need to make sure connection sticks around until the message is delivered
michael@0 864 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
michael@0 865 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 866 DataChannelOnMessageAvailable::ON_CONNECTION,
michael@0 867 this, (DataChannel *) nullptr));
michael@0 868 return true;
michael@0 869 }
michael@0 870 #endif
michael@0 871
michael@0 872 DataChannel *
michael@0 873 DataChannelConnection::FindChannelByStream(uint16_t stream)
michael@0 874 {
michael@0 875 return mStreams.SafeElementAt(stream);
michael@0 876 }
michael@0 877
michael@0 878 uint16_t
michael@0 879 DataChannelConnection::FindFreeStream()
michael@0 880 {
michael@0 881 uint32_t i, j, limit;
michael@0 882
michael@0 883 limit = mStreams.Length();
michael@0 884 if (limit > MAX_NUM_STREAMS)
michael@0 885 limit = MAX_NUM_STREAMS;
michael@0 886
michael@0 887 for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
michael@0 888 if (!mStreams[i]) {
michael@0 889 // Verify it's not still in the process of closing
michael@0 890 for (j = 0; j < mStreamsResetting.Length(); ++j) {
michael@0 891 if (mStreamsResetting[j] == i) {
michael@0 892 break;
michael@0 893 }
michael@0 894 }
michael@0 895 if (j == mStreamsResetting.Length())
michael@0 896 break;
michael@0 897 }
michael@0 898 }
michael@0 899 if (i >= limit) {
michael@0 900 return INVALID_STREAM;
michael@0 901 }
michael@0 902 return i;
michael@0 903 }
michael@0 904
michael@0 905 bool
michael@0 906 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
michael@0 907 {
michael@0 908 struct sctp_status status;
michael@0 909 struct sctp_add_streams sas;
michael@0 910 uint32_t outStreamsNeeded;
michael@0 911 socklen_t len;
michael@0 912
michael@0 913 if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
michael@0 914 aNeeded = MAX_NUM_STREAMS - mStreams.Length();
michael@0 915 }
michael@0 916 if (aNeeded <= 0) {
michael@0 917 return false;
michael@0 918 }
michael@0 919
michael@0 920 len = (socklen_t)sizeof(struct sctp_status);
michael@0 921 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
michael@0 922 LOG(("***failed: getsockopt SCTP_STATUS"));
michael@0 923 return false;
michael@0 924 }
michael@0 925 outStreamsNeeded = aNeeded; // number to add
michael@0 926
michael@0 927 // Note: if multiple channel opens happen when we don't have enough space,
michael@0 928 // we'll call RequestMoreStreams() multiple times
michael@0 929 memset(&sas, 0, sizeof(sas));
michael@0 930 sas.sas_instrms = 0;
michael@0 931 sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
michael@0 932 // Doesn't block, we get an event when it succeeds or fails
michael@0 933 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
michael@0 934 (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
michael@0 935 if (errno == EALREADY) {
michael@0 936 LOG(("Already have %u output streams", outStreamsNeeded));
michael@0 937 return true;
michael@0 938 }
michael@0 939
michael@0 940 LOG(("***failed: setsockopt ADD errno=%d", errno));
michael@0 941 return false;
michael@0 942 }
michael@0 943 LOG(("Requested %u more streams", outStreamsNeeded));
michael@0 944 // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
michael@0 945 // values are larger than mStreams.Length()
michael@0 946 return true;
michael@0 947 }
michael@0 948
michael@0 949 int32_t
michael@0 950 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
michael@0 951 {
michael@0 952 struct sctp_sndinfo sndinfo;
michael@0 953
michael@0 954 // Note: Main-thread IO, but doesn't block
michael@0 955 memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
michael@0 956 sndinfo.snd_sid = stream;
michael@0 957 sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
michael@0 958 if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
michael@0 959 &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
michael@0 960 SCTP_SENDV_SNDINFO, 0) < 0) {
michael@0 961 //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
michael@0 962 return (0);
michael@0 963 }
michael@0 964 return (1);
michael@0 965 }
michael@0 966
michael@0 967 int32_t
michael@0 968 DataChannelConnection::SendOpenAckMessage(uint16_t stream)
michael@0 969 {
michael@0 970 struct rtcweb_datachannel_ack ack;
michael@0 971
michael@0 972 memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
michael@0 973 ack.msg_type = DATA_CHANNEL_ACK;
michael@0 974
michael@0 975 return SendControlMessage(&ack, sizeof(ack), stream);
michael@0 976 }
michael@0 977
michael@0 978 int32_t
michael@0 979 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
michael@0 980 const nsACString& protocol,
michael@0 981 uint16_t stream, bool unordered,
michael@0 982 uint16_t prPolicy, uint32_t prValue)
michael@0 983 {
michael@0 984 int label_len = label.Length(); // not including nul
michael@0 985 int proto_len = protocol.Length(); // not including nul
michael@0 986 struct rtcweb_datachannel_open_request *req =
michael@0 987 (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len);
michael@0 988 // careful - request includes 1 char label
michael@0 989
michael@0 990 memset(req, 0, sizeof(struct rtcweb_datachannel_open_request));
michael@0 991 req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
michael@0 992 switch (prPolicy) {
michael@0 993 case SCTP_PR_SCTP_NONE:
michael@0 994 req->channel_type = DATA_CHANNEL_RELIABLE;
michael@0 995 break;
michael@0 996 case SCTP_PR_SCTP_TTL:
michael@0 997 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
michael@0 998 break;
michael@0 999 case SCTP_PR_SCTP_RTX:
michael@0 1000 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
michael@0 1001 break;
michael@0 1002 default:
michael@0 1003 // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
michael@0 1004 moz_free(req);
michael@0 1005 return (0);
michael@0 1006 }
michael@0 1007 if (unordered) {
michael@0 1008 // Per the current types, all differ by 0x80 between ordered and unordered
michael@0 1009 req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
michael@0 1010 }
michael@0 1011
michael@0 1012 req->reliability_param = htonl(prValue);
michael@0 1013 req->priority = htons(0); /* XXX: add support */
michael@0 1014 req->label_length = htons(label_len);
michael@0 1015 req->protocol_length = htons(proto_len);
michael@0 1016 memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
michael@0 1017 memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
michael@0 1018
michael@0 1019 // sizeof(*req) already includes +1 byte for label, need nul for both strings
michael@0 1020 int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream);
michael@0 1021
michael@0 1022 moz_free(req);
michael@0 1023 return result;
michael@0 1024 }
michael@0 1025
michael@0 1026 // XXX This should use a separate thread (outbound queue) which should
michael@0 1027 // select() to know when to *try* to send data to the socket again.
michael@0 1028 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
michael@0 1029 // (just not sure in what direction). We could re-implement NSPR's
michael@0 1030 // PR_POLL_WRITE/etc handling... with a lot of work.
michael@0 1031
michael@0 1032 // Better yet, use the SCTP stack's notifications on buffer state to avoid
michael@0 1033 // filling the SCTP's buffers.
michael@0 1034
michael@0 1035 // returns if we're still blocked or not
michael@0 1036 bool
michael@0 1037 DataChannelConnection::SendDeferredMessages()
michael@0 1038 {
michael@0 1039 uint32_t i;
michael@0 1040 nsRefPtr<DataChannel> channel; // we may null out the refs to this
michael@0 1041 bool still_blocked = false;
michael@0 1042 bool sent = false;
michael@0 1043
michael@0 1044 // This may block while something is modifying channels, but should not block for IO
michael@0 1045 MutexAutoLock lock(mLock);
michael@0 1046
michael@0 1047 // XXX For total fairness, on a still_blocked we'd start next time at the
michael@0 1048 // same index. Sorry, not going to bother for now.
michael@0 1049 for (i = 0; i < mStreams.Length(); ++i) {
michael@0 1050 channel = mStreams[i];
michael@0 1051 if (!channel)
michael@0 1052 continue;
michael@0 1053
michael@0 1054 // Only one of these should be set....
michael@0 1055 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
michael@0 1056 if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
michael@0 1057 channel->mStream,
michael@0 1058 channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
michael@0 1059 channel->mPrPolicy, channel->mPrValue)) {
michael@0 1060 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
michael@0 1061
michael@0 1062 channel->mState = OPEN;
michael@0 1063 channel->mReady = true;
michael@0 1064 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
michael@0 1065 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1066 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
michael@0 1067 channel));
michael@0 1068 sent = true;
michael@0 1069 } else {
michael@0 1070 if (errno == EAGAIN || errno == EWOULDBLOCK) {
michael@0 1071 still_blocked = true;
michael@0 1072 } else {
michael@0 1073 // Close the channel, inform the user
michael@0 1074 mStreams[channel->mStream] = nullptr;
michael@0 1075 channel->mState = CLOSED;
michael@0 1076 // Don't need to reset; we didn't open it
michael@0 1077 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1078 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
michael@0 1079 channel));
michael@0 1080 }
michael@0 1081 }
michael@0 1082 }
michael@0 1083 if (still_blocked)
michael@0 1084 break;
michael@0 1085
michael@0 1086 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
michael@0 1087 if (SendOpenAckMessage(channel->mStream)) {
michael@0 1088 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
michael@0 1089 sent = true;
michael@0 1090 } else {
michael@0 1091 if (errno == EAGAIN || errno == EWOULDBLOCK) {
michael@0 1092 still_blocked = true;
michael@0 1093 } else {
michael@0 1094 // Close the channel, inform the user
michael@0 1095 CloseInt(channel);
michael@0 1096 // XXX send error via DataChannelOnMessageAvailable (bug 843625)
michael@0 1097 }
michael@0 1098 }
michael@0 1099 }
michael@0 1100 if (still_blocked)
michael@0 1101 break;
michael@0 1102
michael@0 1103 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
michael@0 1104 bool failed_send = false;
michael@0 1105 int32_t result;
michael@0 1106
michael@0 1107 if (channel->mState == CLOSED || channel->mState == CLOSING) {
michael@0 1108 channel->mBufferedData.Clear();
michael@0 1109 }
michael@0 1110 while (!channel->mBufferedData.IsEmpty() &&
michael@0 1111 !failed_send) {
michael@0 1112 struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
michael@0 1113 const char *data = channel->mBufferedData[0]->mData;
michael@0 1114 uint32_t len = channel->mBufferedData[0]->mLength;
michael@0 1115
michael@0 1116 // SCTP will return EMSGSIZE if the message is bigger than the buffer
michael@0 1117 // size (or EAGAIN if there isn't space)
michael@0 1118 if ((result = usrsctp_sendv(mSocket, data, len,
michael@0 1119 nullptr, 0,
michael@0 1120 (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
michael@0 1121 SCTP_SENDV_SPA,
michael@0 1122 0) < 0)) {
michael@0 1123 if (errno == EAGAIN || errno == EWOULDBLOCK) {
michael@0 1124 // leave queued for resend
michael@0 1125 failed_send = true;
michael@0 1126 LOG(("queue full again when resending %d bytes (%d)", len, result));
michael@0 1127 } else {
michael@0 1128 LOG(("error %d re-sending string", errno));
michael@0 1129 failed_send = true;
michael@0 1130 }
michael@0 1131 } else {
michael@0 1132 LOG(("Resent buffer of %d bytes (%d)", len, result));
michael@0 1133 sent = true;
michael@0 1134 channel->mBufferedData.RemoveElementAt(0);
michael@0 1135 }
michael@0 1136 }
michael@0 1137 if (channel->mBufferedData.IsEmpty())
michael@0 1138 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
michael@0 1139 else
michael@0 1140 still_blocked = true;
michael@0 1141 }
michael@0 1142 if (still_blocked)
michael@0 1143 break;
michael@0 1144 }
michael@0 1145
michael@0 1146 if (!still_blocked) {
michael@0 1147 // mDeferTimeout becomes an estimate of how long we need to wait next time we block
michael@0 1148 return false;
michael@0 1149 }
michael@0 1150 // adjust time? More time for next wait if we didn't send anything, less if did
michael@0 1151 // Pretty crude, but better than nothing; just to keep CPU use down
michael@0 1152 if (!sent && mDeferTimeout < 50)
michael@0 1153 mDeferTimeout++;
michael@0 1154 else if (sent && mDeferTimeout > 10)
michael@0 1155 mDeferTimeout--;
michael@0 1156
michael@0 1157 return true;
michael@0 1158 }
michael@0 1159
michael@0 1160 void
michael@0 1161 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
michael@0 1162 size_t length,
michael@0 1163 uint16_t stream)
michael@0 1164 {
michael@0 1165 nsRefPtr<DataChannel> channel;
michael@0 1166 uint32_t prValue;
michael@0 1167 uint16_t prPolicy;
michael@0 1168 uint32_t flags;
michael@0 1169
michael@0 1170 mLock.AssertCurrentThreadOwns();
michael@0 1171
michael@0 1172 if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
michael@0 1173 LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
michael@0 1174 (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
michael@0 1175 if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
michael@0 1176 return;
michael@0 1177 }
michael@0 1178
michael@0 1179 LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
michael@0 1180
michael@0 1181 switch (req->channel_type) {
michael@0 1182 case DATA_CHANNEL_RELIABLE:
michael@0 1183 case DATA_CHANNEL_RELIABLE_UNORDERED:
michael@0 1184 prPolicy = SCTP_PR_SCTP_NONE;
michael@0 1185 break;
michael@0 1186 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
michael@0 1187 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
michael@0 1188 prPolicy = SCTP_PR_SCTP_RTX;
michael@0 1189 break;
michael@0 1190 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
michael@0 1191 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
michael@0 1192 prPolicy = SCTP_PR_SCTP_TTL;
michael@0 1193 break;
michael@0 1194 default:
michael@0 1195 LOG(("Unknown channel type", req->channel_type));
michael@0 1196 /* XXX error handling */
michael@0 1197 return;
michael@0 1198 }
michael@0 1199 prValue = ntohl(req->reliability_param);
michael@0 1200 flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
michael@0 1201
michael@0 1202 if ((channel = FindChannelByStream(stream))) {
michael@0 1203 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
michael@0 1204 LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
michael@0 1205 stream, channel->mState));
michael@0 1206 /* XXX: some error handling */
michael@0 1207 } else {
michael@0 1208 LOG(("Open for externally negotiated channel %u", stream));
michael@0 1209 // XXX should also check protocol, maybe label
michael@0 1210 if (prPolicy != channel->mPrPolicy ||
michael@0 1211 prValue != channel->mPrValue ||
michael@0 1212 flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
michael@0 1213 {
michael@0 1214 LOG(("WARNING: external negotiation mismatch with OpenRequest:"
michael@0 1215 "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
michael@0 1216 stream, prPolicy, channel->mPrPolicy,
michael@0 1217 prValue, channel->mPrValue, flags, channel->mFlags));
michael@0 1218 }
michael@0 1219 }
michael@0 1220 return;
michael@0 1221 }
michael@0 1222 if (stream >= mStreams.Length()) {
michael@0 1223 LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
michael@0 1224 return;
michael@0 1225 }
michael@0 1226
michael@0 1227 nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
michael@0 1228 nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
michael@0 1229 ntohs(req->protocol_length)));
michael@0 1230
michael@0 1231 channel = new DataChannel(this,
michael@0 1232 stream,
michael@0 1233 DataChannel::CONNECTING,
michael@0 1234 label,
michael@0 1235 protocol,
michael@0 1236 prPolicy, prValue,
michael@0 1237 flags,
michael@0 1238 nullptr, nullptr);
michael@0 1239 mStreams[stream] = channel;
michael@0 1240
michael@0 1241 channel->mState = DataChannel::WAITING_TO_OPEN;
michael@0 1242
michael@0 1243 LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
michael@0 1244 channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
michael@0 1245 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1246 DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
michael@0 1247 this, channel));
michael@0 1248
michael@0 1249 LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
michael@0 1250
michael@0 1251 if (!SendOpenAckMessage(stream)) {
michael@0 1252 // XXX Only on EAGAIN!? And if not, then close the channel??
michael@0 1253 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
michael@0 1254 StartDefer();
michael@0 1255 }
michael@0 1256
michael@0 1257 // Now process any queued data messages for the channel (which will
michael@0 1258 // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
michael@0 1259 // more that come in before that happens)
michael@0 1260 DeliverQueuedData(stream);
michael@0 1261 }
michael@0 1262
michael@0 1263 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
michael@0 1264 // That would make this code moot. Keep it for now for backwards compatibility.
michael@0 1265 void
michael@0 1266 DataChannelConnection::DeliverQueuedData(uint16_t stream)
michael@0 1267 {
michael@0 1268 mLock.AssertCurrentThreadOwns();
michael@0 1269
michael@0 1270 uint32_t i = 0;
michael@0 1271 while (i < mQueuedData.Length()) {
michael@0 1272 // Careful! we may modify the array length from within the loop!
michael@0 1273 if (mQueuedData[i]->mStream == stream) {
michael@0 1274 LOG(("Delivering queued data for stream %u, length %u",
michael@0 1275 stream, mQueuedData[i]->mLength));
michael@0 1276 // Deliver the queued data
michael@0 1277 HandleDataMessage(mQueuedData[i]->mPpid,
michael@0 1278 mQueuedData[i]->mData, mQueuedData[i]->mLength,
michael@0 1279 mQueuedData[i]->mStream);
michael@0 1280 mQueuedData.RemoveElementAt(i);
michael@0 1281 continue; // don't bump index since we removed the element
michael@0 1282 }
michael@0 1283 i++;
michael@0 1284 }
michael@0 1285 }
michael@0 1286
michael@0 1287 void
michael@0 1288 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
michael@0 1289 size_t length, uint16_t stream)
michael@0 1290 {
michael@0 1291 DataChannel *channel;
michael@0 1292
michael@0 1293 mLock.AssertCurrentThreadOwns();
michael@0 1294
michael@0 1295 channel = FindChannelByStream(stream);
michael@0 1296 NS_ENSURE_TRUE_VOID(channel);
michael@0 1297
michael@0 1298 LOG(("OpenAck received for stream %u, waiting=%d", stream,
michael@0 1299 (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
michael@0 1300
michael@0 1301 channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
michael@0 1302 }
michael@0 1303
michael@0 1304 void
michael@0 1305 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
michael@0 1306 {
michael@0 1307 /* XXX: Send an error message? */
michael@0 1308 LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
michael@0 1309 // XXX Log to JS error console if possible
michael@0 1310 }
michael@0 1311
michael@0 1312 void
michael@0 1313 DataChannelConnection::HandleDataMessage(uint32_t ppid,
michael@0 1314 const void *data, size_t length,
michael@0 1315 uint16_t stream)
michael@0 1316 {
michael@0 1317 DataChannel *channel;
michael@0 1318 const char *buffer = (const char *) data;
michael@0 1319
michael@0 1320 mLock.AssertCurrentThreadOwns();
michael@0 1321
michael@0 1322 channel = FindChannelByStream(stream);
michael@0 1323
michael@0 1324 // XXX A closed channel may trip this... check
michael@0 1325 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
michael@0 1326 // That would make this code moot. Keep it for now for backwards compatibility.
michael@0 1327 if (!channel) {
michael@0 1328 // In the updated 0-RTT open case, the sender can send data immediately
michael@0 1329 // after Open, and doesn't set the in-order bit (since we don't have a
michael@0 1330 // response or ack). Also, with external negotiation, data can come in
michael@0 1331 // before we're told about the external negotiation. We need to buffer
michael@0 1332 // data until either a) Open comes in, if the ordering get messed up,
michael@0 1333 // or b) the app tells us this channel was externally negotiated. When
michael@0 1334 // these occur, we deliver the data.
michael@0 1335
michael@0 1336 // Since this is rare and non-performance, keep a single list of queued
michael@0 1337 // data messages to deliver once the channel opens.
michael@0 1338 LOG(("Queuing data for stream %u, length %u", stream, length));
michael@0 1339 // Copies data
michael@0 1340 mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
michael@0 1341 return;
michael@0 1342 }
michael@0 1343
michael@0 1344 // XXX should this be a simple if, no warnings/debugbreaks?
michael@0 1345 NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
michael@0 1346
michael@0 1347 {
michael@0 1348 nsAutoCString recvData(buffer, length); // copies (<64) or allocates
michael@0 1349 bool is_binary = true;
michael@0 1350
michael@0 1351 if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
michael@0 1352 ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
michael@0 1353 is_binary = false;
michael@0 1354 }
michael@0 1355 if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
michael@0 1356 NS_WARNING("DataChannel message aborted by fragment type change!");
michael@0 1357 channel->mRecvBuffer.Truncate(0);
michael@0 1358 }
michael@0 1359 channel->mIsRecvBinary = is_binary;
michael@0 1360
michael@0 1361 switch (ppid) {
michael@0 1362 case DATA_CHANNEL_PPID_DOMSTRING:
michael@0 1363 case DATA_CHANNEL_PPID_BINARY:
michael@0 1364 channel->mRecvBuffer += recvData;
michael@0 1365 LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
michael@0 1366 is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
michael@0 1367 channel->mStream));
michael@0 1368 return; // Not ready to notify application
michael@0 1369
michael@0 1370 case DATA_CHANNEL_PPID_DOMSTRING_LAST:
michael@0 1371 LOG(("DataChannel: String message received of length %lu on channel %u",
michael@0 1372 length, channel->mStream));
michael@0 1373 if (!channel->mRecvBuffer.IsEmpty()) {
michael@0 1374 channel->mRecvBuffer += recvData;
michael@0 1375 LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
michael@0 1376 channel->SendOrQueue(new DataChannelOnMessageAvailable(
michael@0 1377 DataChannelOnMessageAvailable::ON_DATA, this,
michael@0 1378 channel, channel->mRecvBuffer, -1));
michael@0 1379 channel->mRecvBuffer.Truncate(0);
michael@0 1380 return;
michael@0 1381 }
michael@0 1382 // else send using recvData normally
michael@0 1383 length = -1; // Flag for DOMString
michael@0 1384
michael@0 1385 // WebSockets checks IsUTF8() here; we can try to deliver it
michael@0 1386 break;
michael@0 1387
michael@0 1388 case DATA_CHANNEL_PPID_BINARY_LAST:
michael@0 1389 LOG(("DataChannel: Received binary message of length %lu on channel id %u",
michael@0 1390 length, channel->mStream));
michael@0 1391 if (!channel->mRecvBuffer.IsEmpty()) {
michael@0 1392 channel->mRecvBuffer += recvData;
michael@0 1393 LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
michael@0 1394 channel->SendOrQueue(new DataChannelOnMessageAvailable(
michael@0 1395 DataChannelOnMessageAvailable::ON_DATA, this,
michael@0 1396 channel, channel->mRecvBuffer,
michael@0 1397 channel->mRecvBuffer.Length()));
michael@0 1398 channel->mRecvBuffer.Truncate(0);
michael@0 1399 return;
michael@0 1400 }
michael@0 1401 // else send using recvData normally
michael@0 1402 break;
michael@0 1403
michael@0 1404 default:
michael@0 1405 NS_ERROR("Unknown data PPID");
michael@0 1406 return;
michael@0 1407 }
michael@0 1408 /* Notify onmessage */
michael@0 1409 LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
michael@0 1410 channel->SendOrQueue(new DataChannelOnMessageAvailable(
michael@0 1411 DataChannelOnMessageAvailable::ON_DATA, this,
michael@0 1412 channel, recvData, length));
michael@0 1413 }
michael@0 1414 }
michael@0 1415
michael@0 1416 // Called with mLock locked!
michael@0 1417 void
michael@0 1418 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
michael@0 1419 {
michael@0 1420 const struct rtcweb_datachannel_open_request *req;
michael@0 1421 const struct rtcweb_datachannel_ack *ack;
michael@0 1422
michael@0 1423 mLock.AssertCurrentThreadOwns();
michael@0 1424
michael@0 1425 switch (ppid) {
michael@0 1426 case DATA_CHANNEL_PPID_CONTROL:
michael@0 1427 req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
michael@0 1428
michael@0 1429 NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
michael@0 1430 switch (req->msg_type) {
michael@0 1431 case DATA_CHANNEL_OPEN_REQUEST:
michael@0 1432 // structure includes a possibly-unused char label[1] (in a packed structure)
michael@0 1433 NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
michael@0 1434
michael@0 1435 HandleOpenRequestMessage(req, length, stream);
michael@0 1436 break;
michael@0 1437 case DATA_CHANNEL_ACK:
michael@0 1438 // >= sizeof(*ack) checked above
michael@0 1439
michael@0 1440 ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
michael@0 1441 HandleOpenAckMessage(ack, length, stream);
michael@0 1442 break;
michael@0 1443 default:
michael@0 1444 HandleUnknownMessage(ppid, length, stream);
michael@0 1445 break;
michael@0 1446 }
michael@0 1447 break;
michael@0 1448 case DATA_CHANNEL_PPID_DOMSTRING:
michael@0 1449 case DATA_CHANNEL_PPID_DOMSTRING_LAST:
michael@0 1450 case DATA_CHANNEL_PPID_BINARY:
michael@0 1451 case DATA_CHANNEL_PPID_BINARY_LAST:
michael@0 1452 HandleDataMessage(ppid, buffer, length, stream);
michael@0 1453 break;
michael@0 1454 default:
michael@0 1455 LOG(("Message of length %lu, PPID %u on stream %u received.",
michael@0 1456 length, ppid, stream));
michael@0 1457 break;
michael@0 1458 }
michael@0 1459 }
michael@0 1460
michael@0 1461 void
michael@0 1462 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
michael@0 1463 {
michael@0 1464 uint32_t i, n;
michael@0 1465
michael@0 1466 switch (sac->sac_state) {
michael@0 1467 case SCTP_COMM_UP:
michael@0 1468 LOG(("Association change: SCTP_COMM_UP"));
michael@0 1469 if (mState == CONNECTING) {
michael@0 1470 mSocket = mMasterSocket;
michael@0 1471 mState = OPEN;
michael@0 1472
michael@0 1473 SetEvenOdd();
michael@0 1474
michael@0 1475 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1476 DataChannelOnMessageAvailable::ON_CONNECTION,
michael@0 1477 this, true));
michael@0 1478 LOG(("DTLS connect() succeeded! Entering connected mode"));
michael@0 1479
michael@0 1480 // Open any streams pending...
michael@0 1481 ProcessQueuedOpens();
michael@0 1482
michael@0 1483 } else if (mState == OPEN) {
michael@0 1484 LOG(("DataConnection Already OPEN"));
michael@0 1485 } else {
michael@0 1486 LOG(("Unexpected state: %d", mState));
michael@0 1487 }
michael@0 1488 break;
michael@0 1489 case SCTP_COMM_LOST:
michael@0 1490 LOG(("Association change: SCTP_COMM_LOST"));
michael@0 1491 // This association is toast, so also close all the channels -- from mainthread!
michael@0 1492 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1493 DataChannelOnMessageAvailable::ON_DISCONNECTED,
michael@0 1494 this));
michael@0 1495 break;
michael@0 1496 case SCTP_RESTART:
michael@0 1497 LOG(("Association change: SCTP_RESTART"));
michael@0 1498 break;
michael@0 1499 case SCTP_SHUTDOWN_COMP:
michael@0 1500 LOG(("Association change: SCTP_SHUTDOWN_COMP"));
michael@0 1501 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1502 DataChannelOnMessageAvailable::ON_DISCONNECTED,
michael@0 1503 this));
michael@0 1504 break;
michael@0 1505 case SCTP_CANT_STR_ASSOC:
michael@0 1506 LOG(("Association change: SCTP_CANT_STR_ASSOC"));
michael@0 1507 break;
michael@0 1508 default:
michael@0 1509 LOG(("Association change: UNKNOWN"));
michael@0 1510 break;
michael@0 1511 }
michael@0 1512 LOG(("Association change: streams (in/out) = (%u/%u)",
michael@0 1513 sac->sac_inbound_streams, sac->sac_outbound_streams));
michael@0 1514
michael@0 1515 NS_ENSURE_TRUE_VOID(sac);
michael@0 1516 n = sac->sac_length - sizeof(*sac);
michael@0 1517 if (((sac->sac_state == SCTP_COMM_UP) ||
michael@0 1518 (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
michael@0 1519 for (i = 0; i < n; ++i) {
michael@0 1520 switch (sac->sac_info[i]) {
michael@0 1521 case SCTP_ASSOC_SUPPORTS_PR:
michael@0 1522 LOG(("Supports: PR"));
michael@0 1523 break;
michael@0 1524 case SCTP_ASSOC_SUPPORTS_AUTH:
michael@0 1525 LOG(("Supports: AUTH"));
michael@0 1526 break;
michael@0 1527 case SCTP_ASSOC_SUPPORTS_ASCONF:
michael@0 1528 LOG(("Supports: ASCONF"));
michael@0 1529 break;
michael@0 1530 case SCTP_ASSOC_SUPPORTS_MULTIBUF:
michael@0 1531 LOG(("Supports: MULTIBUF"));
michael@0 1532 break;
michael@0 1533 case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
michael@0 1534 LOG(("Supports: RE-CONFIG"));
michael@0 1535 break;
michael@0 1536 default:
michael@0 1537 LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
michael@0 1538 break;
michael@0 1539 }
michael@0 1540 }
michael@0 1541 } else if (((sac->sac_state == SCTP_COMM_LOST) ||
michael@0 1542 (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
michael@0 1543 LOG(("Association: ABORT ="));
michael@0 1544 for (i = 0; i < n; ++i) {
michael@0 1545 LOG((" 0x%02x", sac->sac_info[i]));
michael@0 1546 }
michael@0 1547 }
michael@0 1548 if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
michael@0 1549 (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
michael@0 1550 (sac->sac_state == SCTP_COMM_LOST)) {
michael@0 1551 return;
michael@0 1552 }
michael@0 1553 }
michael@0 1554
michael@0 1555 void
michael@0 1556 DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
michael@0 1557 {
michael@0 1558 char addr_buf[INET6_ADDRSTRLEN];
michael@0 1559 const char *addr = "";
michael@0 1560 struct sockaddr_in *sin;
michael@0 1561 struct sockaddr_in6 *sin6;
michael@0 1562 #if defined(__Userspace_os_Windows)
michael@0 1563 DWORD addr_len = INET6_ADDRSTRLEN;
michael@0 1564 #endif
michael@0 1565
michael@0 1566 switch (spc->spc_aaddr.ss_family) {
michael@0 1567 case AF_INET:
michael@0 1568 sin = (struct sockaddr_in *)&spc->spc_aaddr;
michael@0 1569 #if !defined(__Userspace_os_Windows)
michael@0 1570 addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
michael@0 1571 #else
michael@0 1572 if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr,
michael@0 1573 addr_buf, &addr_len)) {
michael@0 1574 return;
michael@0 1575 }
michael@0 1576 #endif
michael@0 1577 break;
michael@0 1578 case AF_INET6:
michael@0 1579 sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
michael@0 1580 #if !defined(__Userspace_os_Windows)
michael@0 1581 addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
michael@0 1582 #else
michael@0 1583 if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr,
michael@0 1584 addr_buf, &addr_len)) {
michael@0 1585 return;
michael@0 1586 }
michael@0 1587 #endif
michael@0 1588 case AF_CONN:
michael@0 1589 addr = "DTLS connection";
michael@0 1590 break;
michael@0 1591 default:
michael@0 1592 break;
michael@0 1593 }
michael@0 1594 LOG(("Peer address %s is now ", addr));
michael@0 1595 switch (spc->spc_state) {
michael@0 1596 case SCTP_ADDR_AVAILABLE:
michael@0 1597 LOG(("SCTP_ADDR_AVAILABLE"));
michael@0 1598 break;
michael@0 1599 case SCTP_ADDR_UNREACHABLE:
michael@0 1600 LOG(("SCTP_ADDR_UNREACHABLE"));
michael@0 1601 break;
michael@0 1602 case SCTP_ADDR_REMOVED:
michael@0 1603 LOG(("SCTP_ADDR_REMOVED"));
michael@0 1604 break;
michael@0 1605 case SCTP_ADDR_ADDED:
michael@0 1606 LOG(("SCTP_ADDR_ADDED"));
michael@0 1607 break;
michael@0 1608 case SCTP_ADDR_MADE_PRIM:
michael@0 1609 LOG(("SCTP_ADDR_MADE_PRIM"));
michael@0 1610 break;
michael@0 1611 case SCTP_ADDR_CONFIRMED:
michael@0 1612 LOG(("SCTP_ADDR_CONFIRMED"));
michael@0 1613 break;
michael@0 1614 default:
michael@0 1615 LOG(("UNKNOWN"));
michael@0 1616 break;
michael@0 1617 }
michael@0 1618 LOG((" (error = 0x%08x).\n", spc->spc_error));
michael@0 1619 }
michael@0 1620
michael@0 1621 void
michael@0 1622 DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
michael@0 1623 {
michael@0 1624 size_t i, n;
michael@0 1625
michael@0 1626 n = sre->sre_length - sizeof(struct sctp_remote_error);
michael@0 1627 LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
michael@0 1628 for (i = 0; i < n; ++i) {
michael@0 1629 LOG((" 0x%02x", sre-> sre_data[i]));
michael@0 1630 }
michael@0 1631 }
michael@0 1632
michael@0 1633 void
michael@0 1634 DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
michael@0 1635 {
michael@0 1636 LOG(("Shutdown event."));
michael@0 1637 /* XXX: notify all channels. */
michael@0 1638 // Attempts to actually send anything will fail
michael@0 1639 }
michael@0 1640
michael@0 1641 void
michael@0 1642 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
michael@0 1643 {
michael@0 1644 LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
michael@0 1645 }
michael@0 1646
michael@0 1647 void
michael@0 1648 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
michael@0 1649 {
michael@0 1650 size_t i, n;
michael@0 1651
michael@0 1652 if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
michael@0 1653 LOG(("Unsent "));
michael@0 1654 }
michael@0 1655 if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
michael@0 1656 LOG(("Sent "));
michael@0 1657 }
michael@0 1658 if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
michael@0 1659 LOG(("(flags = %x) ", ssfe->ssfe_flags));
michael@0 1660 }
michael@0 1661 LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
michael@0 1662 ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
michael@0 1663 ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
michael@0 1664 n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
michael@0 1665 for (i = 0; i < n; ++i) {
michael@0 1666 LOG((" 0x%02x", ssfe->ssfe_data[i]));
michael@0 1667 }
michael@0 1668 }
michael@0 1669
michael@0 1670 void
michael@0 1671 DataChannelConnection::ClearResets()
michael@0 1672 {
michael@0 1673 // Clear all pending resets
michael@0 1674 if (!mStreamsResetting.IsEmpty()) {
michael@0 1675 LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
michael@0 1676 }
michael@0 1677
michael@0 1678 for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
michael@0 1679 nsRefPtr<DataChannel> channel;
michael@0 1680 channel = FindChannelByStream(mStreamsResetting[i]);
michael@0 1681 if (channel) {
michael@0 1682 LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
michael@0 1683 mStreams[channel->mStream] = nullptr;
michael@0 1684 }
michael@0 1685 }
michael@0 1686 mStreamsResetting.Clear();
michael@0 1687 }
michael@0 1688
michael@0 1689 void
michael@0 1690 DataChannelConnection::ResetOutgoingStream(uint16_t stream)
michael@0 1691 {
michael@0 1692 uint32_t i;
michael@0 1693
michael@0 1694 mLock.AssertCurrentThreadOwns();
michael@0 1695 LOG(("Connection %p: Resetting outgoing stream %u",
michael@0 1696 (void *) this, stream));
michael@0 1697 // Rarely has more than a couple items and only for a short time
michael@0 1698 for (i = 0; i < mStreamsResetting.Length(); ++i) {
michael@0 1699 if (mStreamsResetting[i] == stream) {
michael@0 1700 return;
michael@0 1701 }
michael@0 1702 }
michael@0 1703 mStreamsResetting.AppendElement(stream);
michael@0 1704 }
michael@0 1705
michael@0 1706 void
michael@0 1707 DataChannelConnection::SendOutgoingStreamReset()
michael@0 1708 {
michael@0 1709 struct sctp_reset_streams *srs;
michael@0 1710 uint32_t i;
michael@0 1711 size_t len;
michael@0 1712
michael@0 1713 LOG(("Connection %p: Sending outgoing stream reset for %d streams",
michael@0 1714 (void *) this, mStreamsResetting.Length()));
michael@0 1715 mLock.AssertCurrentThreadOwns();
michael@0 1716 if (mStreamsResetting.IsEmpty()) {
michael@0 1717 LOG(("No streams to reset"));
michael@0 1718 return;
michael@0 1719 }
michael@0 1720 len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
michael@0 1721 srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
michael@0 1722 memset(srs, 0, len);
michael@0 1723 srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
michael@0 1724 srs->srs_number_streams = mStreamsResetting.Length();
michael@0 1725 for (i = 0; i < mStreamsResetting.Length(); ++i) {
michael@0 1726 srs->srs_stream_list[i] = mStreamsResetting[i];
michael@0 1727 }
michael@0 1728 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
michael@0 1729 LOG(("***failed: setsockopt RESET, errno %d", errno));
michael@0 1730 // if errno == EALREADY, this is normal - we can't send another reset
michael@0 1731 // with one pending.
michael@0 1732 // When we get an incoming reset (which may be a response to our
michael@0 1733 // outstanding one), see if we have any pending outgoing resets and
michael@0 1734 // send them
michael@0 1735 } else {
michael@0 1736 mStreamsResetting.Clear();
michael@0 1737 }
michael@0 1738 moz_free(srs);
michael@0 1739 }
michael@0 1740
michael@0 1741 void
michael@0 1742 DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
michael@0 1743 {
michael@0 1744 uint32_t n, i;
michael@0 1745 nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel
michael@0 1746
michael@0 1747 if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
michael@0 1748 !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
michael@0 1749 n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
michael@0 1750 for (i = 0; i < n; ++i) {
michael@0 1751 if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
michael@0 1752 channel = FindChannelByStream(strrst->strreset_stream_list[i]);
michael@0 1753 if (channel) {
michael@0 1754 // The other side closed the channel
michael@0 1755 // We could be in three states:
michael@0 1756 // 1. Normal state (input and output streams (OPEN)
michael@0 1757 // Notify application, send a RESET in response on our
michael@0 1758 // outbound channel. Go to CLOSED
michael@0 1759 // 2. We sent our own reset (CLOSING); either they crossed on the
michael@0 1760 // wire, or this is a response to our Reset.
michael@0 1761 // Go to CLOSED
michael@0 1762 // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
michael@0 1763 // I believe this is impossible, as we don't have an input stream yet.
michael@0 1764
michael@0 1765 LOG(("Incoming: Channel %u closed, state %d",
michael@0 1766 channel->mStream, channel->mState));
michael@0 1767 ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
michael@0 1768 channel->mState == DataChannel::CLOSING ||
michael@0 1769 channel->mState == DataChannel::CONNECTING ||
michael@0 1770 channel->mState == DataChannel::WAITING_TO_OPEN);
michael@0 1771 if (channel->mState == DataChannel::OPEN ||
michael@0 1772 channel->mState == DataChannel::WAITING_TO_OPEN) {
michael@0 1773 ResetOutgoingStream(channel->mStream);
michael@0 1774 SendOutgoingStreamReset();
michael@0 1775 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1776 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
michael@0 1777 channel));
michael@0 1778 }
michael@0 1779 mStreams[channel->mStream] = nullptr;
michael@0 1780
michael@0 1781 LOG(("Disconnected DataChannel %p from connection %p",
michael@0 1782 (void *) channel.get(), (void *) channel->mConnection.get()));
michael@0 1783 channel->Destroy();
michael@0 1784 // At this point when we leave here, the object is a zombie held alive only by the DOM object
michael@0 1785 } else {
michael@0 1786 LOG(("Can't find incoming channel %d",i));
michael@0 1787 }
michael@0 1788 }
michael@0 1789 }
michael@0 1790 }
michael@0 1791
michael@0 1792 // In case we failed to send a RESET due to having one outstanding, process any pending resets now:
michael@0 1793 if (!mStreamsResetting.IsEmpty()) {
michael@0 1794 LOG(("Sending %d pending resets", mStreamsResetting.Length()));
michael@0 1795 SendOutgoingStreamReset();
michael@0 1796 }
michael@0 1797 }
michael@0 1798
michael@0 1799 void
michael@0 1800 DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
michael@0 1801 {
michael@0 1802 uint16_t stream;
michael@0 1803 uint32_t i;
michael@0 1804 nsRefPtr<DataChannel> channel;
michael@0 1805
michael@0 1806 if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
michael@0 1807 LOG(("*** Failed increasing number of streams from %u (%u/%u)",
michael@0 1808 mStreams.Length(),
michael@0 1809 strchg->strchange_instrms,
michael@0 1810 strchg->strchange_outstrms));
michael@0 1811 // XXX FIX! notify pending opens of failure
michael@0 1812 return;
michael@0 1813 } else {
michael@0 1814 if (strchg->strchange_instrms > mStreams.Length()) {
michael@0 1815 LOG(("Other side increased streams from %u to %u",
michael@0 1816 mStreams.Length(), strchg->strchange_instrms));
michael@0 1817 }
michael@0 1818 if (strchg->strchange_outstrms > mStreams.Length() ||
michael@0 1819 strchg->strchange_instrms > mStreams.Length()) {
michael@0 1820 uint16_t old_len = mStreams.Length();
michael@0 1821 uint16_t new_len = std::max(strchg->strchange_outstrms,
michael@0 1822 strchg->strchange_instrms);
michael@0 1823 LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
michael@0 1824 old_len, new_len, new_len - old_len,
michael@0 1825 strchg->strchange_instrms));
michael@0 1826 // make sure both are the same length
michael@0 1827 mStreams.AppendElements(new_len - old_len);
michael@0 1828 LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
michael@0 1829 for (size_t i = old_len; i < mStreams.Length(); ++i) {
michael@0 1830 mStreams[i] = nullptr;
michael@0 1831 }
michael@0 1832 // Re-process any channels waiting for streams.
michael@0 1833 // Linear search, but we don't increase channels often and
michael@0 1834 // the array would only get long in case of an app error normally
michael@0 1835
michael@0 1836 // Make sure we request enough streams if there's a big jump in streams
michael@0 1837 // Could make a more complex API for OpenXxxFinish() and avoid this loop
michael@0 1838 int32_t num_needed = mPending.GetSize();
michael@0 1839 LOG(("%d of %d new streams already needed", num_needed,
michael@0 1840 new_len - old_len));
michael@0 1841 num_needed -= (new_len - old_len); // number we added
michael@0 1842 if (num_needed > 0) {
michael@0 1843 if (num_needed < 16)
michael@0 1844 num_needed = 16;
michael@0 1845 LOG(("Not enough new streams, asking for %d more", num_needed));
michael@0 1846 RequestMoreStreams(num_needed);
michael@0 1847 } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
michael@0 1848 LOG(("Requesting %d output streams to match partner",
michael@0 1849 strchg->strchange_instrms - strchg->strchange_outstrms));
michael@0 1850 RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
michael@0 1851 }
michael@0 1852
michael@0 1853 ProcessQueuedOpens();
michael@0 1854 }
michael@0 1855 // else probably not a change in # of streams
michael@0 1856 }
michael@0 1857
michael@0 1858 for (i = 0; i < mStreams.Length(); ++i) {
michael@0 1859 channel = mStreams[i];
michael@0 1860 if (!channel)
michael@0 1861 continue;
michael@0 1862
michael@0 1863 if ((channel->mState == CONNECTING) &&
michael@0 1864 (channel->mStream == INVALID_STREAM)) {
michael@0 1865 if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
michael@0 1866 (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
michael@0 1867 /* XXX: Signal to the other end. */
michael@0 1868 channel->mState = CLOSED;
michael@0 1869 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 1870 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
michael@0 1871 channel));
michael@0 1872 // maybe fire onError (bug 843625)
michael@0 1873 } else {
michael@0 1874 stream = FindFreeStream();
michael@0 1875 if (stream != INVALID_STREAM) {
michael@0 1876 channel->mStream = stream;
michael@0 1877 mStreams[stream] = channel;
michael@0 1878 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
michael@0 1879 /// XXX fix
michael@0 1880 StartDefer();
michael@0 1881 } else {
michael@0 1882 /* We will not find more ... */
michael@0 1883 break;
michael@0 1884 }
michael@0 1885 }
michael@0 1886 }
michael@0 1887 }
michael@0 1888 }
michael@0 1889
michael@0 1890
michael@0 1891 // Called with mLock locked!
michael@0 1892 void
michael@0 1893 DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
michael@0 1894 {
michael@0 1895 mLock.AssertCurrentThreadOwns();
michael@0 1896 if (notif->sn_header.sn_length != (uint32_t)n) {
michael@0 1897 return;
michael@0 1898 }
michael@0 1899 switch (notif->sn_header.sn_type) {
michael@0 1900 case SCTP_ASSOC_CHANGE:
michael@0 1901 HandleAssociationChangeEvent(&(notif->sn_assoc_change));
michael@0 1902 break;
michael@0 1903 case SCTP_PEER_ADDR_CHANGE:
michael@0 1904 HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
michael@0 1905 break;
michael@0 1906 case SCTP_REMOTE_ERROR:
michael@0 1907 HandleRemoteErrorEvent(&(notif->sn_remote_error));
michael@0 1908 break;
michael@0 1909 case SCTP_SHUTDOWN_EVENT:
michael@0 1910 HandleShutdownEvent(&(notif->sn_shutdown_event));
michael@0 1911 break;
michael@0 1912 case SCTP_ADAPTATION_INDICATION:
michael@0 1913 HandleAdaptationIndication(&(notif->sn_adaptation_event));
michael@0 1914 break;
michael@0 1915 case SCTP_PARTIAL_DELIVERY_EVENT:
michael@0 1916 LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
michael@0 1917 break;
michael@0 1918 case SCTP_AUTHENTICATION_EVENT:
michael@0 1919 LOG(("SCTP_AUTHENTICATION_EVENT"));
michael@0 1920 break;
michael@0 1921 case SCTP_SENDER_DRY_EVENT:
michael@0 1922 //LOG(("SCTP_SENDER_DRY_EVENT"));
michael@0 1923 break;
michael@0 1924 case SCTP_NOTIFICATIONS_STOPPED_EVENT:
michael@0 1925 LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
michael@0 1926 break;
michael@0 1927 case SCTP_SEND_FAILED_EVENT:
michael@0 1928 HandleSendFailedEvent(&(notif->sn_send_failed_event));
michael@0 1929 break;
michael@0 1930 case SCTP_STREAM_RESET_EVENT:
michael@0 1931 HandleStreamResetEvent(&(notif->sn_strreset_event));
michael@0 1932 break;
michael@0 1933 case SCTP_ASSOC_RESET_EVENT:
michael@0 1934 LOG(("SCTP_ASSOC_RESET_EVENT"));
michael@0 1935 break;
michael@0 1936 case SCTP_STREAM_CHANGE_EVENT:
michael@0 1937 HandleStreamChangeEvent(&(notif->sn_strchange_event));
michael@0 1938 break;
michael@0 1939 default:
michael@0 1940 LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
michael@0 1941 break;
michael@0 1942 }
michael@0 1943 }
michael@0 1944
michael@0 1945 int
michael@0 1946 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
michael@0 1947 struct sctp_rcvinfo rcv, int32_t flags)
michael@0 1948 {
michael@0 1949 ASSERT_WEBRTC(!NS_IsMainThread());
michael@0 1950
michael@0 1951 if (!data) {
michael@0 1952 usrsctp_close(sock); // SCTP has finished shutting down
michael@0 1953 } else {
michael@0 1954 MutexAutoLock lock(mLock);
michael@0 1955 if (flags & MSG_NOTIFICATION) {
michael@0 1956 HandleNotification(static_cast<union sctp_notification *>(data), datalen);
michael@0 1957 } else {
michael@0 1958 HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
michael@0 1959 }
michael@0 1960 }
michael@0 1961 // sctp allocates 'data' with malloc(), and expects the receiver to free
michael@0 1962 // it (presumably with free).
michael@0 1963 // XXX future optimization: try to deliver messages without an internal
michael@0 1964 // alloc/copy, and if so delay the free until later.
michael@0 1965 free(data);
michael@0 1966 // usrsctp defines the callback as returning an int, but doesn't use it
michael@0 1967 return 1;
michael@0 1968 }
michael@0 1969
michael@0 1970 already_AddRefed<DataChannel>
michael@0 1971 DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
michael@0 1972 Type type, bool inOrder,
michael@0 1973 uint32_t prValue, DataChannelListener *aListener,
michael@0 1974 nsISupports *aContext, bool aExternalNegotiated,
michael@0 1975 uint16_t aStream)
michael@0 1976 {
michael@0 1977 // aStream == INVALID_STREAM to have the protocol allocate
michael@0 1978 uint16_t prPolicy = SCTP_PR_SCTP_NONE;
michael@0 1979 uint32_t flags;
michael@0 1980
michael@0 1981 LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
michael@0 1982 PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
michael@0 1983 type, inOrder, prValue, aListener, aContext,
michael@0 1984 aExternalNegotiated ? "true" : "false", aStream));
michael@0 1985 switch (type) {
michael@0 1986 case DATA_CHANNEL_RELIABLE:
michael@0 1987 prPolicy = SCTP_PR_SCTP_NONE;
michael@0 1988 break;
michael@0 1989 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
michael@0 1990 prPolicy = SCTP_PR_SCTP_RTX;
michael@0 1991 break;
michael@0 1992 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
michael@0 1993 prPolicy = SCTP_PR_SCTP_TTL;
michael@0 1994 break;
michael@0 1995 }
michael@0 1996 if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
michael@0 1997 return nullptr;
michael@0 1998 }
michael@0 1999
michael@0 2000 // Don't look past currently-negotiated streams
michael@0 2001 if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
michael@0 2002 LOG(("ERROR: external negotiation of already-open channel %u", aStream));
michael@0 2003 // XXX How do we indicate this up to the application? Probably the
michael@0 2004 // caller's job, but we may need to return an error code.
michael@0 2005 return nullptr;
michael@0 2006 }
michael@0 2007
michael@0 2008 flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
michael@0 2009 nsRefPtr<DataChannel> channel(new DataChannel(this,
michael@0 2010 aStream,
michael@0 2011 DataChannel::CONNECTING,
michael@0 2012 label, protocol,
michael@0 2013 type, prValue,
michael@0 2014 flags,
michael@0 2015 aListener, aContext));
michael@0 2016 if (aExternalNegotiated) {
michael@0 2017 channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
michael@0 2018 }
michael@0 2019
michael@0 2020 MutexAutoLock lock(mLock); // OpenFinish assumes this
michael@0 2021 return OpenFinish(channel.forget());
michael@0 2022 }
michael@0 2023
michael@0 2024 // Separate routine so we can also call it to finish up from pending opens
michael@0 2025 already_AddRefed<DataChannel>
michael@0 2026 DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
michael@0 2027 {
michael@0 2028 nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in
michael@0 2029 // Normally 1 reference if called from ::Open(), or 2 if called from
michael@0 2030 // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
michael@0 2031 uint16_t stream = channel->mStream;
michael@0 2032 bool queue = false;
michael@0 2033
michael@0 2034 mLock.AssertCurrentThreadOwns();
michael@0 2035
michael@0 2036 // Cases we care about:
michael@0 2037 // Pre-negotiated:
michael@0 2038 // Not Open:
michael@0 2039 // Doesn't fit:
michael@0 2040 // -> change initial ask or renegotiate after open
michael@0 2041 // -> queue open
michael@0 2042 // Open:
michael@0 2043 // Doesn't fit:
michael@0 2044 // -> RequestMoreStreams && queue
michael@0 2045 // Does fit:
michael@0 2046 // -> open
michael@0 2047 // Not negotiated:
michael@0 2048 // Not Open:
michael@0 2049 // -> queue open
michael@0 2050 // Open:
michael@0 2051 // -> Try to get a stream
michael@0 2052 // Doesn't fit:
michael@0 2053 // -> RequestMoreStreams && queue
michael@0 2054 // Does fit:
michael@0 2055 // -> open
michael@0 2056 // So the Open cases are basically the same
michael@0 2057 // Not Open cases are simply queue for non-negotiated, and
michael@0 2058 // either change the initial ask or possibly renegotiate after open.
michael@0 2059
michael@0 2060 if (mState == OPEN) {
michael@0 2061 if (stream == INVALID_STREAM) {
michael@0 2062 stream = FindFreeStream(); // may be INVALID_STREAM if we need more
michael@0 2063 }
michael@0 2064 if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
michael@0 2065 // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
michael@0 2066 // to avoid going back immediately for more if the ask to N, N+1, etc
michael@0 2067 int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
michael@0 2068 (stream-((int32_t)mStreams.Length())) + 16;
michael@0 2069 if (!RequestMoreStreams(more_needed)) {
michael@0 2070 // Something bad happened... we're done
michael@0 2071 goto request_error_cleanup;
michael@0 2072 }
michael@0 2073 queue = true;
michael@0 2074 }
michael@0 2075 } else {
michael@0 2076 // not OPEN
michael@0 2077 if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
michael@0 2078 mState == CLOSED) {
michael@0 2079 // Update number of streams for init message
michael@0 2080 struct sctp_initmsg initmsg;
michael@0 2081 socklen_t len = sizeof(initmsg);
michael@0 2082 int32_t total_needed = stream+16;
michael@0 2083
michael@0 2084 memset(&initmsg, 0, sizeof(initmsg));
michael@0 2085 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
michael@0 2086 LOG(("*** failed getsockopt SCTP_INITMSG"));
michael@0 2087 goto request_error_cleanup;
michael@0 2088 }
michael@0 2089 LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
michael@0 2090 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
michael@0 2091 initmsg.sinit_num_ostreams = total_needed;
michael@0 2092 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
michael@0 2093 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
michael@0 2094 (socklen_t)sizeof(initmsg)) < 0) {
michael@0 2095 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
michael@0 2096 goto request_error_cleanup;
michael@0 2097 }
michael@0 2098
michael@0 2099 int32_t old_len = mStreams.Length();
michael@0 2100 mStreams.AppendElements(total_needed - old_len);
michael@0 2101 for (int32_t i = old_len; i < total_needed; ++i) {
michael@0 2102 mStreams[i] = nullptr;
michael@0 2103 }
michael@0 2104 }
michael@0 2105 // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
michael@0 2106 // is called, if needed
michael@0 2107 queue = true;
michael@0 2108 }
michael@0 2109 if (queue) {
michael@0 2110 LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
michael@0 2111 // Also serves to mark we told the app
michael@0 2112 channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
michael@0 2113 channel->AddRef(); // we need a ref for the nsDeQue and one to return
michael@0 2114 mPending.Push(channel);
michael@0 2115 return channel.forget();
michael@0 2116 }
michael@0 2117
michael@0 2118 MOZ_ASSERT(stream != INVALID_STREAM);
michael@0 2119 // just allocated (& OPEN), or externally negotiated
michael@0 2120 mStreams[stream] = channel; // holds a reference
michael@0 2121 channel->mStream = stream;
michael@0 2122
michael@0 2123 #ifdef TEST_QUEUED_DATA
michael@0 2124 // It's painful to write a test for this...
michael@0 2125 channel->mState = OPEN;
michael@0 2126 channel->mReady = true;
michael@0 2127 SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
michael@0 2128 #endif
michael@0 2129
michael@0 2130 if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
michael@0 2131 // Don't send unordered until this gets cleared
michael@0 2132 channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
michael@0 2133 }
michael@0 2134
michael@0 2135 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
michael@0 2136 if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
michael@0 2137 stream,
michael@0 2138 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
michael@0 2139 channel->mPrPolicy, channel->mPrValue)) {
michael@0 2140 LOG(("SendOpenRequest failed, errno = %d", errno));
michael@0 2141 if (errno == EAGAIN || errno == EWOULDBLOCK) {
michael@0 2142 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
michael@0 2143 StartDefer();
michael@0 2144
michael@0 2145 return channel.forget();
michael@0 2146 } else {
michael@0 2147 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
michael@0 2148 // We already returned the channel to the app.
michael@0 2149 NS_ERROR("Failed to send open request");
michael@0 2150 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 2151 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
michael@0 2152 channel));
michael@0 2153 }
michael@0 2154 // If we haven't returned the channel yet, it will get destroyed when we exit
michael@0 2155 // this function.
michael@0 2156 mStreams[stream] = nullptr;
michael@0 2157 channel->mStream = INVALID_STREAM;
michael@0 2158 // we'll be destroying the channel
michael@0 2159 channel->mState = CLOSED;
michael@0 2160 return nullptr;
michael@0 2161 }
michael@0 2162 /* NOTREACHED */
michael@0 2163 }
michael@0 2164 }
michael@0 2165 // Either externally negotiated or we sent Open
michael@0 2166 channel->mState = OPEN;
michael@0 2167 channel->mReady = true;
michael@0 2168 // FIX? Move into DOMDataChannel? I don't think we can send it yet here
michael@0 2169 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
michael@0 2170 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 2171 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
michael@0 2172 channel));
michael@0 2173
michael@0 2174 return channel.forget();
michael@0 2175
michael@0 2176 request_error_cleanup:
michael@0 2177 channel->mState = CLOSED;
michael@0 2178 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
michael@0 2179 // We already returned the channel to the app.
michael@0 2180 NS_ERROR("Failed to request more streams");
michael@0 2181 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 2182 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
michael@0 2183 channel));
michael@0 2184 return channel.forget();
michael@0 2185 }
michael@0 2186 // we'll be destroying the channel, but it never really got set up
michael@0 2187 // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
michael@0 2188 // Dispatch it to ourselves
michael@0 2189 return nullptr;
michael@0 2190 }
michael@0 2191
michael@0 2192 int32_t
michael@0 2193 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
michael@0 2194 uint32_t length, uint32_t ppid)
michael@0 2195 {
michael@0 2196 uint16_t flags;
michael@0 2197 struct sctp_sendv_spa spa;
michael@0 2198 int32_t result;
michael@0 2199
michael@0 2200 NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
michael@0 2201 NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
michael@0 2202
michael@0 2203 // To avoid problems where an in-order OPEN is lost and an
michael@0 2204 // out-of-order data message "beats" it, require data to be in-order
michael@0 2205 // until we get an ACK.
michael@0 2206 if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
michael@0 2207 !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
michael@0 2208 flags = SCTP_UNORDERED;
michael@0 2209 } else {
michael@0 2210 flags = 0;
michael@0 2211 }
michael@0 2212
michael@0 2213 spa.sendv_sndinfo.snd_ppid = htonl(ppid);
michael@0 2214 spa.sendv_sndinfo.snd_sid = channel->mStream;
michael@0 2215 spa.sendv_sndinfo.snd_flags = flags;
michael@0 2216 spa.sendv_sndinfo.snd_context = 0;
michael@0 2217 spa.sendv_sndinfo.snd_assoc_id = 0;
michael@0 2218 spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
michael@0 2219
michael@0 2220 if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
michael@0 2221 spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
michael@0 2222 spa.sendv_prinfo.pr_value = channel->mPrValue;
michael@0 2223 spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
michael@0 2224 }
michael@0 2225
michael@0 2226 // Note: Main-thread IO, but doesn't block!
michael@0 2227 // XXX FIX! to deal with heavy overruns of JS trying to pass data in
michael@0 2228 // (more than the buffersize) queue data onto another thread to do the
michael@0 2229 // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
michael@0 2230
michael@0 2231 // SCTP will return EMSGSIZE if the message is bigger than the buffer
michael@0 2232 // size (or EAGAIN if there isn't space)
michael@0 2233 if (channel->mBufferedData.IsEmpty()) {
michael@0 2234 result = usrsctp_sendv(mSocket, data, length,
michael@0 2235 nullptr, 0,
michael@0 2236 (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
michael@0 2237 SCTP_SENDV_SPA, 0);
michael@0 2238 LOG(("Sent buffer (len=%u), result=%d", length, result));
michael@0 2239 } else {
michael@0 2240 // Fake EAGAIN if we're already buffering data
michael@0 2241 result = -1;
michael@0 2242 errno = EAGAIN;
michael@0 2243 }
michael@0 2244 if (result < 0) {
michael@0 2245 if (errno == EAGAIN || errno == EWOULDBLOCK) {
michael@0 2246 // queue data for resend! And queue any further data for the stream until it is...
michael@0 2247 BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc
michael@0 2248 channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
michael@0 2249 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
michael@0 2250 LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
michael@0 2251 StartDefer();
michael@0 2252 return 0;
michael@0 2253 }
michael@0 2254 LOG(("error %d sending string", errno));
michael@0 2255 }
michael@0 2256 return result;
michael@0 2257 }
michael@0 2258
michael@0 2259 // Handles fragmenting binary messages
michael@0 2260 int32_t
michael@0 2261 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
michael@0 2262 uint32_t len,
michael@0 2263 uint32_t ppid_partial, uint32_t ppid_final)
michael@0 2264 {
michael@0 2265 // Since there's a limit on network buffer size and no limits on message
michael@0 2266 // size, and we don't want to use EOR mode (multiple writes for a
michael@0 2267 // message, but all other streams are blocked until you finish sending
michael@0 2268 // this message), we need to add application-level fragmentation of large
michael@0 2269 // messages. On a reliable channel, these can be simply rebuilt into a
michael@0 2270 // large message. On an unreliable channel, we can't and don't know how
michael@0 2271 // long to wait, and there are no retransmissions, and no easy way to
michael@0 2272 // tell the user "this part is missing", so on unreliable channels we
michael@0 2273 // need to return an error if sending more bytes than the network buffers
michael@0 2274 // can hold, and perhaps a lower number.
michael@0 2275
michael@0 2276 // We *really* don't want to do this from main thread! - and SendMsgInternal
michael@0 2277 // avoids blocking.
michael@0 2278 // This MUST be reliable and in-order for the reassembly to work
michael@0 2279 if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
michael@0 2280 channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
michael@0 2281 !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
michael@0 2282 int32_t sent=0;
michael@0 2283 uint32_t origlen = len;
michael@0 2284 LOG(("Sending binary message length %u in chunks", len));
michael@0 2285 // XXX check flags for out-of-order, or force in-order for large binary messages
michael@0 2286 while (len > 0) {
michael@0 2287 uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
michael@0 2288 uint32_t ppid;
michael@0 2289 len -= sendlen;
michael@0 2290 ppid = len > 0 ? ppid_partial : ppid_final;
michael@0 2291 LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
michael@0 2292 // Note that these might end up being deferred and queued.
michael@0 2293 sent += SendMsgInternal(channel, data, sendlen, ppid);
michael@0 2294 data += sendlen;
michael@0 2295 }
michael@0 2296 LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
michael@0 2297 (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
michael@0 2298 origlen, sent,
michael@0 2299 channel->mBufferedData.Length()));
michael@0 2300 return sent;
michael@0 2301 }
michael@0 2302 NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
michael@0 2303 "Sending too-large data on unreliable channel!");
michael@0 2304
michael@0 2305 // This will fail if the message is too large (default 256K)
michael@0 2306 return SendMsgInternal(channel, data, len, ppid_final);
michael@0 2307 }
michael@0 2308
michael@0 2309 class ReadBlobRunnable : public nsRunnable {
michael@0 2310 public:
michael@0 2311 ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
michael@0 2312 nsIInputStream* aBlob) :
michael@0 2313 mConnection(aConnection),
michael@0 2314 mStream(aStream),
michael@0 2315 mBlob(aBlob)
michael@0 2316 { }
michael@0 2317
michael@0 2318 NS_IMETHODIMP Run() {
michael@0 2319 // ReadBlob() is responsible to releasing the reference
michael@0 2320 DataChannelConnection *self = mConnection;
michael@0 2321 self->ReadBlob(mConnection.forget(), mStream, mBlob);
michael@0 2322 return NS_OK;
michael@0 2323 }
michael@0 2324
michael@0 2325 private:
michael@0 2326 // Make sure the Connection doesn't die while there are jobs outstanding.
michael@0 2327 // Let it die (if released by PeerConnectionImpl while we're running)
michael@0 2328 // when we send our runnable back to MainThread. Then ~DataChannelConnection
michael@0 2329 // can send the IOThread to MainThread to die in a runnable, avoiding
michael@0 2330 // unsafe event loop recursion. Evil.
michael@0 2331 nsRefPtr<DataChannelConnection> mConnection;
michael@0 2332 uint16_t mStream;
michael@0 2333 // Use RefCount for preventing the object is deleted when SendBlob returns.
michael@0 2334 nsRefPtr<nsIInputStream> mBlob;
michael@0 2335 };
michael@0 2336
michael@0 2337 int32_t
michael@0 2338 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
michael@0 2339 {
michael@0 2340 DataChannel *channel = mStreams[stream];
michael@0 2341 NS_ENSURE_TRUE(channel, 0);
michael@0 2342 // Spawn a thread to send the data
michael@0 2343 if (!mInternalIOThread) {
michael@0 2344 nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
michael@0 2345 if (NS_FAILED(res)) {
michael@0 2346 return -1;
michael@0 2347 }
michael@0 2348 }
michael@0 2349
michael@0 2350 nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob);
michael@0 2351 mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL);
michael@0 2352 return 0;
michael@0 2353 }
michael@0 2354
michael@0 2355 void
michael@0 2356 DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
michael@0 2357 uint16_t aStream, nsIInputStream* aBlob)
michael@0 2358 {
michael@0 2359 // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
michael@0 2360 // it off mainthread; if PeerConnectionImpl has released then we want
michael@0 2361 // ~DataChannelConnection() to run on MainThread
michael@0 2362
michael@0 2363 // XXX to do this safely, we must enqueue these atomically onto the
michael@0 2364 // output socket. We need a sender thread(s?) to enque data into the
michael@0 2365 // socket and to avoid main-thread IO that might block. Even on a
michael@0 2366 // background thread, we may not want to block on one stream's data.
michael@0 2367 // I.e. run non-blocking and service multiple channels.
michael@0 2368
michael@0 2369 // For now as a hack, send as a single blast of queued packets which may
michael@0 2370 // be deferred until buffer space is available.
michael@0 2371 nsCString temp;
michael@0 2372 uint64_t len;
michael@0 2373 nsCOMPtr<nsIThread> mainThread;
michael@0 2374 NS_GetMainThread(getter_AddRefs(mainThread));
michael@0 2375
michael@0 2376 if (NS_FAILED(aBlob->Available(&len)) ||
michael@0 2377 NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) {
michael@0 2378 // Bug 966602: Doesn't return an error to the caller via onerror.
michael@0 2379 // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
michael@0 2380 NS_ProxyRelease(mainThread, aThis.take());
michael@0 2381 return;
michael@0 2382 }
michael@0 2383 aBlob->Close();
michael@0 2384 RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis),
michael@0 2385 &DataChannelConnection::SendBinaryMsg,
michael@0 2386 aStream, temp),
michael@0 2387 NS_DISPATCH_NORMAL);
michael@0 2388 }
michael@0 2389
michael@0 2390 void
michael@0 2391 DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
michael@0 2392 {
michael@0 2393 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 2394 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
michael@0 2395 if (mStreams[i]) {
michael@0 2396 aStreamList->push_back(mStreams[i]->mStream);
michael@0 2397 }
michael@0 2398 }
michael@0 2399 }
michael@0 2400
michael@0 2401 int32_t
michael@0 2402 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
michael@0 2403 bool isBinary)
michael@0 2404 {
michael@0 2405 ASSERT_WEBRTC(NS_IsMainThread());
michael@0 2406 // We really could allow this from other threads, so long as we deal with
michael@0 2407 // asynchronosity issues with channels closing, in particular access to
michael@0 2408 // mStreams, and issues with the association closing (access to mSocket).
michael@0 2409
michael@0 2410 const char *data = aMsg.BeginReading();
michael@0 2411 uint32_t len = aMsg.Length();
michael@0 2412 DataChannel *channel;
michael@0 2413
michael@0 2414 LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
michael@0 2415 // XXX if we want more efficiency, translate flags once at open time
michael@0 2416 channel = mStreams[stream];
michael@0 2417 NS_ENSURE_TRUE(channel, 0);
michael@0 2418
michael@0 2419 if (isBinary)
michael@0 2420 return SendBinary(channel, data, len,
michael@0 2421 DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
michael@0 2422 return SendBinary(channel, data, len,
michael@0 2423 DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
michael@0 2424 }
michael@0 2425
michael@0 2426 void
michael@0 2427 DataChannelConnection::Close(DataChannel *aChannel)
michael@0 2428 {
michael@0 2429 MutexAutoLock lock(mLock);
michael@0 2430 CloseInt(aChannel);
michael@0 2431 }
michael@0 2432
michael@0 2433 // So we can call Close() with the lock already held
michael@0 2434 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
michael@0 2435 void
michael@0 2436 DataChannelConnection::CloseInt(DataChannel *aChannel)
michael@0 2437 {
michael@0 2438 MOZ_ASSERT(aChannel);
michael@0 2439 nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
michael@0 2440
michael@0 2441 mLock.AssertCurrentThreadOwns();
michael@0 2442 LOG(("Connection %p/Channel %p: Closing stream %u",
michael@0 2443 channel->mConnection.get(), channel.get(), channel->mStream));
michael@0 2444 // re-test since it may have closed before the lock was grabbed
michael@0 2445 if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
michael@0 2446 LOG(("Channel already closing/closed (%u)", aChannel->mState));
michael@0 2447 if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
michael@0 2448 // called from CloseAll()
michael@0 2449 // we're not going to hang around waiting any more
michael@0 2450 mStreams[channel->mStream] = nullptr;
michael@0 2451 }
michael@0 2452 return;
michael@0 2453 }
michael@0 2454 aChannel->mBufferedData.Clear();
michael@0 2455 if (channel->mStream != INVALID_STREAM) {
michael@0 2456 ResetOutgoingStream(channel->mStream);
michael@0 2457 if (mState == CLOSED) { // called from CloseAll()
michael@0 2458 // Let resets accumulate then send all at once in CloseAll()
michael@0 2459 // we're not going to hang around waiting
michael@0 2460 mStreams[channel->mStream] = nullptr;
michael@0 2461 } else {
michael@0 2462 SendOutgoingStreamReset();
michael@0 2463 }
michael@0 2464 }
michael@0 2465 aChannel->mState = CLOSING;
michael@0 2466 if (mState == CLOSED) {
michael@0 2467 // we're not going to hang around waiting
michael@0 2468 channel->Destroy();
michael@0 2469 }
michael@0 2470 // At this point when we leave here, the object is a zombie held alive only by the DOM object
michael@0 2471 }
michael@0 2472
michael@0 2473 void DataChannelConnection::CloseAll()
michael@0 2474 {
michael@0 2475 LOG(("Closing all channels (connection %p)", (void*) this));
michael@0 2476 // Don't need to lock here
michael@0 2477
michael@0 2478 // Make sure no more channels will be opened
michael@0 2479 {
michael@0 2480 MutexAutoLock lock(mLock);
michael@0 2481 mState = CLOSED;
michael@0 2482 }
michael@0 2483
michael@0 2484 // Close current channels
michael@0 2485 // If there are runnables, they hold a strong ref and keep the channel
michael@0 2486 // and/or connection alive (even if in a CLOSED state)
michael@0 2487 bool closed_some = false;
michael@0 2488 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
michael@0 2489 if (mStreams[i]) {
michael@0 2490 mStreams[i]->Close();
michael@0 2491 closed_some = true;
michael@0 2492 }
michael@0 2493 }
michael@0 2494
michael@0 2495 // Clean up any pending opens for channels
michael@0 2496 nsRefPtr<DataChannel> channel;
michael@0 2497 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
michael@0 2498 LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
michael@0 2499 channel->Close(); // also releases the ref on each iteration
michael@0 2500 closed_some = true;
michael@0 2501 }
michael@0 2502 // It's more efficient to let the Resets queue in shutdown and then
michael@0 2503 // SendOutgoingStreamReset() here.
michael@0 2504 if (closed_some) {
michael@0 2505 MutexAutoLock lock(mLock);
michael@0 2506 SendOutgoingStreamReset();
michael@0 2507 }
michael@0 2508 }
michael@0 2509
michael@0 2510 DataChannel::~DataChannel()
michael@0 2511 {
michael@0 2512 // NS_ASSERTION since this is more "I think I caught all the cases that
michael@0 2513 // can cause this" than a true kill-the-program assertion. If this is
michael@0 2514 // wrong, nothing bad happens. A worst it's a leak.
michael@0 2515 NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
michael@0 2516 }
michael@0 2517
michael@0 2518 void
michael@0 2519 DataChannel::Close()
michael@0 2520 {
michael@0 2521 ENSURE_DATACONNECTION;
michael@0 2522 mConnection->Close(this);
michael@0 2523 }
michael@0 2524
michael@0 2525 // Used when disconnecting from the DataChannelConnection
michael@0 2526 void
michael@0 2527 DataChannel::Destroy()
michael@0 2528 {
michael@0 2529 ENSURE_DATACONNECTION;
michael@0 2530
michael@0 2531 LOG(("Destroying Data channel %u", mStream));
michael@0 2532 MOZ_ASSERT_IF(mStream != INVALID_STREAM,
michael@0 2533 !mConnection->FindChannelByStream(mStream));
michael@0 2534 mStream = INVALID_STREAM;
michael@0 2535 mState = CLOSED;
michael@0 2536 mConnection = nullptr;
michael@0 2537 }
michael@0 2538
michael@0 2539 void
michael@0 2540 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
michael@0 2541 {
michael@0 2542 MutexAutoLock mLock(mListenerLock);
michael@0 2543 mContext = aContext;
michael@0 2544 mListener = aListener;
michael@0 2545 }
michael@0 2546
michael@0 2547 // May be called from another (i.e. Main) thread!
michael@0 2548 void
michael@0 2549 DataChannel::AppReady()
michael@0 2550 {
michael@0 2551 ENSURE_DATACONNECTION;
michael@0 2552
michael@0 2553 MutexAutoLock lock(mConnection->mLock);
michael@0 2554
michael@0 2555 mReady = true;
michael@0 2556 if (mState == WAITING_TO_OPEN) {
michael@0 2557 mState = OPEN;
michael@0 2558 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
michael@0 2559 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
michael@0 2560 this));
michael@0 2561 for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
michael@0 2562 nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
michael@0 2563 MOZ_ASSERT(runnable);
michael@0 2564 NS_DispatchToMainThread(runnable);
michael@0 2565 }
michael@0 2566 } else {
michael@0 2567 NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
michael@0 2568 }
michael@0 2569 mQueuedMessages.Clear();
michael@0 2570 mQueuedMessages.Compact();
michael@0 2571 // We never use it again... We could even allocate the array in the odd
michael@0 2572 // cases we need it.
michael@0 2573 }
michael@0 2574
michael@0 2575 uint32_t
michael@0 2576 DataChannel::GetBufferedAmount()
michael@0 2577 {
michael@0 2578 uint32_t buffered = 0;
michael@0 2579 for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
michael@0 2580 buffered += mBufferedData[i]->mLength;
michael@0 2581 }
michael@0 2582 return buffered;
michael@0 2583 }
michael@0 2584
michael@0 2585 // Called with mLock locked!
michael@0 2586 void
michael@0 2587 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
michael@0 2588 {
michael@0 2589 if (!mReady &&
michael@0 2590 (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
michael@0 2591 mQueuedMessages.AppendElement(aMessage);
michael@0 2592 } else {
michael@0 2593 NS_DispatchToMainThread(aMessage);
michael@0 2594 }
michael@0 2595 }
michael@0 2596
michael@0 2597 } // namespace mozilla

mercurial