1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/netwerk/sctp/datachannel/DataChannel.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,2597 @@ 1.4 +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* vim: set ts=2 et sw=2 tw=80: */ 1.6 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this file, 1.8 + * You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.9 + 1.10 +#include <stdio.h> 1.11 +#include <stdlib.h> 1.12 +#if !defined(__Userspace_os_Windows) 1.13 +#include <arpa/inet.h> 1.14 +#endif 1.15 +// usrsctp.h expects to have errno definitions prior to its inclusion. 1.16 +#include <errno.h> 1.17 + 1.18 +#define SCTP_DEBUG 1 1.19 +#define SCTP_STDINT_INCLUDE <stdint.h> 1.20 + 1.21 +#ifdef _MSC_VER 1.22 +// Disable "warning C4200: nonstandard extension used : zero-sized array in 1.23 +// struct/union" 1.24 +// ...which the third-party file usrsctp.h runs afoul of. 1.25 +#pragma warning(push) 1.26 +#pragma warning(disable:4200) 1.27 +#endif 1.28 + 1.29 +#include "usrsctp.h" 1.30 + 1.31 +#ifdef _MSC_VER 1.32 +#pragma warning(pop) 1.33 +#endif 1.34 + 1.35 +#include "DataChannelLog.h" 1.36 + 1.37 +#include "nsServiceManagerUtils.h" 1.38 +#include "nsIObserverService.h" 1.39 +#include "nsIObserver.h" 1.40 +#include "mozilla/Services.h" 1.41 +#include "nsProxyRelease.h" 1.42 +#include "nsThread.h" 1.43 +#include "nsThreadUtils.h" 1.44 +#include "nsAutoPtr.h" 1.45 +#include "nsNetUtil.h" 1.46 +#include "mozilla/StaticPtr.h" 1.47 +#ifdef MOZ_PEERCONNECTION 1.48 +#include "mtransport/runnable_utils.h" 1.49 +#endif 1.50 + 1.51 +#define DATACHANNEL_LOG(args) LOG(args) 1.52 +#include "DataChannel.h" 1.53 +#include "DataChannelProtocol.h" 1.54 + 1.55 +#ifdef PR_LOGGING 1.56 +PRLogModuleInfo* 1.57 +GetDataChannelLog() 1.58 +{ 1.59 + static PRLogModuleInfo* sLog; 1.60 + if (!sLog) 1.61 + sLog = PR_NewLogModule("DataChannel"); 1.62 + return sLog; 1.63 +} 1.64 + 1.65 +PRLogModuleInfo* 1.66 +GetSCTPLog() 1.67 +{ 1.68 + static PRLogModuleInfo* sLog; 1.69 + if (!sLog) 1.70 + sLog = PR_NewLogModule("SCTP"); 1.71 + return sLog; 1.72 +} 1.73 +#endif 1.74 + 1.75 +// Let us turn on and off important assertions in non-debug builds 1.76 +#ifdef DEBUG 1.77 +#define ASSERT_WEBRTC(x) MOZ_ASSERT((x)) 1.78 +#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS) 1.79 +#define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0) 1.80 +#endif 1.81 + 1.82 +static bool sctp_initialized; 1.83 + 1.84 +namespace mozilla { 1.85 + 1.86 +class DataChannelShutdown; 1.87 +StaticRefPtr<DataChannelShutdown> gDataChannelShutdown; 1.88 + 1.89 +class DataChannelShutdown : public nsIObserver 1.90 +{ 1.91 +public: 1.92 + // This needs to be tied to some form object that is guaranteed to be 1.93 + // around (singleton likely) unless we want to shutdown sctp whenever 1.94 + // we're not using it (and in which case we'd keep a refcnt'd object 1.95 + // ref'd by each DataChannelConnection to release the SCTP usrlib via 1.96 + // sctp_finish) 1.97 + 1.98 + NS_DECL_ISUPPORTS 1.99 + 1.100 + DataChannelShutdown() {} 1.101 + 1.102 + void Init() 1.103 + { 1.104 + nsCOMPtr<nsIObserverService> observerService = 1.105 + mozilla::services::GetObserverService(); 1.106 + if (!observerService) 1.107 + return; 1.108 + 1.109 + nsresult rv = observerService->AddObserver(this, 1.110 + "profile-change-net-teardown", 1.111 + false); 1.112 + MOZ_ASSERT(rv == NS_OK); 1.113 + (void) rv; 1.114 + } 1.115 + 1.116 + virtual ~DataChannelShutdown() 1.117 + { 1.118 + nsCOMPtr<nsIObserverService> observerService = 1.119 + mozilla::services::GetObserverService(); 1.120 + if (observerService) 1.121 + observerService->RemoveObserver(this, "profile-change-net-teardown"); 1.122 + } 1.123 + 1.124 + NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic, 1.125 + const char16_t* aData) { 1.126 + if (strcmp(aTopic, "profile-change-net-teardown") == 0) { 1.127 + LOG(("Shutting down SCTP")); 1.128 + if (sctp_initialized) { 1.129 + usrsctp_finish(); 1.130 + sctp_initialized = false; 1.131 + } 1.132 + nsCOMPtr<nsIObserverService> observerService = 1.133 + mozilla::services::GetObserverService(); 1.134 + if (!observerService) 1.135 + return NS_ERROR_FAILURE; 1.136 + 1.137 + nsresult rv = observerService->RemoveObserver(this, 1.138 + "profile-change-net-teardown"); 1.139 + MOZ_ASSERT(rv == NS_OK); 1.140 + (void) rv; 1.141 + 1.142 + nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this); 1.143 + gDataChannelShutdown = nullptr; 1.144 + } 1.145 + return NS_OK; 1.146 + } 1.147 +}; 1.148 + 1.149 +NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver); 1.150 + 1.151 + 1.152 +BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data, 1.153 + uint32_t length) : mLength(length) 1.154 +{ 1.155 + mSpa = new sctp_sendv_spa; 1.156 + *mSpa = spa; 1.157 + char *tmp = new char[length]; // infallible malloc! 1.158 + memcpy(tmp, data, length); 1.159 + mData = tmp; 1.160 +} 1.161 + 1.162 +BufferedMsg::~BufferedMsg() 1.163 +{ 1.164 + delete mSpa; 1.165 + delete mData; 1.166 +} 1.167 + 1.168 +static int 1.169 +receive_cb(struct socket* sock, union sctp_sockstore addr, 1.170 + void *data, size_t datalen, 1.171 + struct sctp_rcvinfo rcv, int flags, void *ulp_info) 1.172 +{ 1.173 + DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info); 1.174 + return connection->ReceiveCallback(sock, data, datalen, rcv, flags); 1.175 +} 1.176 + 1.177 +#ifdef PR_LOGGING 1.178 +static void 1.179 +debug_printf(const char *format, ...) 1.180 +{ 1.181 + va_list ap; 1.182 + char buffer[1024]; 1.183 + 1.184 + if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { 1.185 + va_start(ap, format); 1.186 +#ifdef _WIN32 1.187 + if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { 1.188 +#else 1.189 + if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) { 1.190 +#endif 1.191 + PR_LogPrint("%s", buffer); 1.192 + } 1.193 + va_end(ap); 1.194 + } 1.195 +} 1.196 +#endif 1.197 + 1.198 +DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) : 1.199 + mLock("netwerk::sctp::DataChannelConnection") 1.200 +{ 1.201 + mState = CLOSED; 1.202 + mSocket = nullptr; 1.203 + mMasterSocket = nullptr; 1.204 + mListener = listener->asWeakPtr(); 1.205 + mLocalPort = 0; 1.206 + mRemotePort = 0; 1.207 + mDeferTimeout = 10; 1.208 + mTimerRunning = false; 1.209 + LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get())); 1.210 + mInternalIOThread = nullptr; 1.211 +} 1.212 + 1.213 +DataChannelConnection::~DataChannelConnection() 1.214 +{ 1.215 + LOG(("Deleting DataChannelConnection %p", (void *) this)); 1.216 + // This may die on the MainThread, or on the STS thread 1.217 + ASSERT_WEBRTC(mState == CLOSED); 1.218 + MOZ_ASSERT(!mMasterSocket); 1.219 + MOZ_ASSERT(mPending.GetSize() == 0); 1.220 + 1.221 + // Already disconnected from sigslot/mTransportFlow 1.222 + // TransportFlows must be released from the STS thread 1.223 + if (!IsSTSThread()) { 1.224 + ASSERT_WEBRTC(NS_IsMainThread()); 1.225 + if (mTransportFlow) { 1.226 + ASSERT_WEBRTC(mSTS); 1.227 + NS_ProxyRelease(mSTS, mTransportFlow); 1.228 + } 1.229 + 1.230 + if (mInternalIOThread) { 1.231 + // Avoid spinning the event thread from here (which if we're mainthread 1.232 + // is in the event loop already) 1.233 + NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread), 1.234 + &nsIThread::Shutdown), 1.235 + NS_DISPATCH_NORMAL); 1.236 + } 1.237 + } else { 1.238 + // on STS, safe to call shutdown 1.239 + if (mInternalIOThread) { 1.240 + mInternalIOThread->Shutdown(); 1.241 + } 1.242 + } 1.243 +} 1.244 + 1.245 +void 1.246 +DataChannelConnection::Destroy() 1.247 +{ 1.248 + // Though it's probably ok to do this and close the sockets; 1.249 + // if we really want it to do true clean shutdowns it can 1.250 + // create a dependant Internal object that would remain around 1.251 + // until the network shut down the association or timed out. 1.252 + LOG(("Destroying DataChannelConnection %p", (void *) this)); 1.253 + ASSERT_WEBRTC(NS_IsMainThread()); 1.254 + CloseAll(); 1.255 + 1.256 + MutexAutoLock lock(mLock); 1.257 + // If we had a pending reset, we aren't waiting for it - clear the list so 1.258 + // we can deregister this DataChannelConnection without leaking. 1.259 + ClearResets(); 1.260 + 1.261 + MOZ_ASSERT(mSTS); 1.262 + ASSERT_WEBRTC(NS_IsMainThread()); 1.263 + // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, 1.264 + // the usrsctp_close() calls can move back here (and just proxy the 1.265 + // disconnect_all()) 1.266 + RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this), 1.267 + &DataChannelConnection::DestroyOnSTS, 1.268 + mSocket, mMasterSocket), 1.269 + NS_DISPATCH_NORMAL); 1.270 + 1.271 + // These will be released on STS 1.272 + mSocket = nullptr; 1.273 + mMasterSocket = nullptr; // also a flag that we've Destroyed this connection 1.274 + 1.275 + // Must do this in Destroy() since we may then delete this object 1.276 + if (mUsingDtls) { 1.277 + usrsctp_deregister_address(static_cast<void *>(this)); 1.278 + LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this))); 1.279 + } 1.280 + 1.281 + // We can't get any more new callbacks from the SCTP library 1.282 + // All existing callbacks have refs to DataChannelConnection 1.283 + 1.284 + // nsDOMDataChannel objects have refs to DataChannels that have refs to us 1.285 +} 1.286 + 1.287 +void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket, 1.288 + struct socket *aSocket) 1.289 +{ 1.290 + if (aSocket && aSocket != aMasterSocket) 1.291 + usrsctp_close(aSocket); 1.292 + if (aMasterSocket) 1.293 + usrsctp_close(aMasterSocket); 1.294 + 1.295 + disconnect_all(); 1.296 +} 1.297 + 1.298 +NS_IMPL_ISUPPORTS(DataChannelConnection, 1.299 + nsITimerCallback) 1.300 + 1.301 +bool 1.302 +DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls) 1.303 +{ 1.304 + struct sctp_initmsg initmsg; 1.305 + struct sctp_udpencaps encaps; 1.306 + struct sctp_assoc_value av; 1.307 + struct sctp_event event; 1.308 + socklen_t len; 1.309 + 1.310 + uint16_t event_types[] = {SCTP_ASSOC_CHANGE, 1.311 + SCTP_PEER_ADDR_CHANGE, 1.312 + SCTP_REMOTE_ERROR, 1.313 + SCTP_SHUTDOWN_EVENT, 1.314 + SCTP_ADAPTATION_INDICATION, 1.315 + SCTP_SEND_FAILED_EVENT, 1.316 + SCTP_STREAM_RESET_EVENT, 1.317 + SCTP_STREAM_CHANGE_EVENT}; 1.318 + { 1.319 + ASSERT_WEBRTC(NS_IsMainThread()); 1.320 + 1.321 + // MutexAutoLock lock(mLock); Not needed since we're on mainthread always 1.322 + if (!sctp_initialized) { 1.323 + if (aUsingDtls) { 1.324 + LOG(("sctp_init(DTLS)")); 1.325 +#ifdef MOZ_PEERCONNECTION 1.326 + usrsctp_init(0, 1.327 + DataChannelConnection::SctpDtlsOutput, 1.328 +#ifdef PR_LOGGING 1.329 + debug_printf 1.330 +#else 1.331 + nullptr 1.332 +#endif 1.333 + ); 1.334 +#else 1.335 + NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport"); 1.336 +#endif 1.337 + } else { 1.338 + LOG(("sctp_init(%u)", aPort)); 1.339 + usrsctp_init(aPort, 1.340 + nullptr, 1.341 +#ifdef PR_LOGGING 1.342 + debug_printf 1.343 +#else 1.344 + nullptr 1.345 +#endif 1.346 + ); 1.347 + } 1.348 + 1.349 +#ifdef PR_LOGGING 1.350 + // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs 1.351 + if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { 1.352 + usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); 1.353 + } 1.354 +#endif 1.355 + usrsctp_sysctl_set_sctp_blackhole(2); 1.356 + // ECN is currently not supported by the Firefox code 1.357 + usrsctp_sysctl_set_sctp_ecn_enable(0); 1.358 + sctp_initialized = true; 1.359 + 1.360 + gDataChannelShutdown = new DataChannelShutdown(); 1.361 + gDataChannelShutdown->Init(); 1.362 + } 1.363 + } 1.364 + 1.365 + // XXX FIX! make this a global we get once 1.366 + // Find the STS thread 1.367 + nsresult rv; 1.368 + mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); 1.369 + MOZ_ASSERT(NS_SUCCEEDED(rv)); 1.370 + 1.371 + // Open sctp with a callback 1.372 + if ((mMasterSocket = usrsctp_socket( 1.373 + aUsingDtls ? AF_CONN : AF_INET, 1.374 + SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) { 1.375 + return false; 1.376 + } 1.377 + 1.378 + // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking 1.379 + // in associations for normal IO 1.380 + if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) { 1.381 + LOG(("Couldn't set non_blocking on SCTP socket")); 1.382 + // We can't handle connect() safely if it will block, not that this will 1.383 + // even happen. 1.384 + goto error_cleanup; 1.385 + } 1.386 + 1.387 + // Make sure when we close the socket, make sure it doesn't call us back again! 1.388 + // This would cause it try to use an invalid DataChannelConnection pointer 1.389 + struct linger l; 1.390 + l.l_onoff = 1; 1.391 + l.l_linger = 0; 1.392 + if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, 1.393 + (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { 1.394 + LOG(("Couldn't set SO_LINGER on SCTP socket")); 1.395 + // unsafe to allow it to continue if this fails 1.396 + goto error_cleanup; 1.397 + } 1.398 + 1.399 + // XXX Consider disabling this when we add proper SDP negotiation. 1.400 + // We may want to leave enabled for supporting 'cloning' of SDP offers, which 1.401 + // implies re-use of the same pseudo-port number, or forcing a renegotiation. 1.402 + { 1.403 + uint32_t on = 1; 1.404 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, 1.405 + (const void *)&on, (socklen_t)sizeof(on)) < 0) { 1.406 + LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); 1.407 + } 1.408 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY, 1.409 + (const void *)&on, (socklen_t)sizeof(on)) < 0) { 1.410 + LOG(("Couldn't set SCTP_NODELAY on SCTP socket")); 1.411 + } 1.412 + } 1.413 + 1.414 + if (!aUsingDtls) { 1.415 + memset(&encaps, 0, sizeof(encaps)); 1.416 + encaps.sue_address.ss_family = AF_INET; 1.417 + encaps.sue_port = htons(aPort); 1.418 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT, 1.419 + (const void*)&encaps, 1.420 + (socklen_t)sizeof(struct sctp_udpencaps)) < 0) { 1.421 + LOG(("*** failed encaps errno %d", errno)); 1.422 + goto error_cleanup; 1.423 + } 1.424 + LOG(("SCTP encapsulation local port %d", aPort)); 1.425 + } 1.426 + 1.427 + av.assoc_id = SCTP_ALL_ASSOC; 1.428 + av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; 1.429 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, 1.430 + (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { 1.431 + LOG(("*** failed enable stream reset errno %d", errno)); 1.432 + goto error_cleanup; 1.433 + } 1.434 + 1.435 + /* Enable the events of interest. */ 1.436 + memset(&event, 0, sizeof(event)); 1.437 + event.se_assoc_id = SCTP_ALL_ASSOC; 1.438 + event.se_on = 1; 1.439 + for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) { 1.440 + event.se_type = event_types[i]; 1.441 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) { 1.442 + LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno)); 1.443 + goto error_cleanup; 1.444 + } 1.445 + } 1.446 + 1.447 + // Update number of streams 1.448 + mStreams.AppendElements(aNumStreams); 1.449 + for (uint32_t i = 0; i < aNumStreams; ++i) { 1.450 + mStreams[i] = nullptr; 1.451 + } 1.452 + memset(&initmsg, 0, sizeof(initmsg)); 1.453 + len = sizeof(initmsg); 1.454 + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { 1.455 + LOG(("*** failed getsockopt SCTP_INITMSG")); 1.456 + goto error_cleanup; 1.457 + } 1.458 + LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, 1.459 + initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); 1.460 + initmsg.sinit_num_ostreams = aNumStreams; 1.461 + initmsg.sinit_max_instreams = MAX_NUM_STREAMS; 1.462 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, 1.463 + (socklen_t)sizeof(initmsg)) < 0) { 1.464 + LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); 1.465 + goto error_cleanup; 1.466 + } 1.467 + 1.468 + mSocket = nullptr; 1.469 + if (aUsingDtls) { 1.470 + mUsingDtls = true; 1.471 + usrsctp_register_address(static_cast<void *>(this)); 1.472 + LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this))); 1.473 + } else { 1.474 + mUsingDtls = false; 1.475 + } 1.476 + return true; 1.477 + 1.478 +error_cleanup: 1.479 + usrsctp_close(mMasterSocket); 1.480 + mMasterSocket = nullptr; 1.481 + mUsingDtls = false; 1.482 + return false; 1.483 +} 1.484 + 1.485 +void 1.486 +DataChannelConnection::StartDefer() 1.487 +{ 1.488 + nsresult rv; 1.489 + if (!NS_IsMainThread()) { 1.490 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.491 + DataChannelOnMessageAvailable::START_DEFER, 1.492 + this, (DataChannel *) nullptr)); 1.493 + return; 1.494 + } 1.495 + 1.496 + ASSERT_WEBRTC(NS_IsMainThread()); 1.497 + if (!mDeferredTimer) { 1.498 + mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); 1.499 + MOZ_ASSERT(mDeferredTimer); 1.500 + } 1.501 + 1.502 + if (!mTimerRunning) { 1.503 + rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, 1.504 + nsITimer::TYPE_ONE_SHOT); 1.505 + NS_ENSURE_TRUE_VOID(rv == NS_OK); 1.506 + 1.507 + mTimerRunning = true; 1.508 + } 1.509 +} 1.510 + 1.511 +// nsITimerCallback 1.512 + 1.513 +NS_IMETHODIMP 1.514 +DataChannelConnection::Notify(nsITimer *timer) 1.515 +{ 1.516 + ASSERT_WEBRTC(NS_IsMainThread()); 1.517 + LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout)); 1.518 + 1.519 + if (timer == mDeferredTimer) { 1.520 + if (SendDeferredMessages()) { 1.521 + // Still blocked 1.522 + // we don't need a lock, since this must be main thread... 1.523 + nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, 1.524 + nsITimer::TYPE_ONE_SHOT); 1.525 + if (NS_FAILED(rv)) { 1.526 + LOG(("%s: cannot initialize open timer", __FUNCTION__)); 1.527 + // XXX and do....? 1.528 + return rv; 1.529 + } 1.530 + mTimerRunning = true; 1.531 + } else { 1.532 + LOG(("Turned off deferred send timer")); 1.533 + mTimerRunning = false; 1.534 + } 1.535 + } 1.536 + return NS_OK; 1.537 +} 1.538 + 1.539 +#ifdef MOZ_PEERCONNECTION 1.540 +void 1.541 +DataChannelConnection::SetEvenOdd() 1.542 +{ 1.543 + ASSERT_WEBRTC(IsSTSThread()); 1.544 + 1.545 + TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>( 1.546 + mTransportFlow->GetLayer(TransportLayerDtls::ID())); 1.547 + MOZ_ASSERT(dtls); // DTLS is mandatory 1.548 + mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT); 1.549 +} 1.550 + 1.551 +bool 1.552 +DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport) 1.553 +{ 1.554 + LOG(("Connect DTLS local %u, remote %u", localport, remoteport)); 1.555 + 1.556 + NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!"); 1.557 + NS_ENSURE_TRUE(aFlow, false); 1.558 + 1.559 + mTransportFlow = aFlow; 1.560 + mLocalPort = localport; 1.561 + mRemotePort = remoteport; 1.562 + mState = CONNECTING; 1.563 + 1.564 + RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this), 1.565 + &DataChannelConnection::SetSignals), 1.566 + NS_DISPATCH_NORMAL); 1.567 + return true; 1.568 +} 1.569 + 1.570 +void 1.571 +DataChannelConnection::SetSignals() 1.572 +{ 1.573 + ASSERT_WEBRTC(IsSTSThread()); 1.574 + ASSERT_WEBRTC(mTransportFlow); 1.575 + LOG(("Setting transport signals, state: %d", mTransportFlow->state())); 1.576 + mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput); 1.577 + // SignalStateChange() doesn't call you with the initial state 1.578 + mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect); 1.579 + CompleteConnect(mTransportFlow, mTransportFlow->state()); 1.580 +} 1.581 + 1.582 +void 1.583 +DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state) 1.584 +{ 1.585 + LOG(("Data transport state: %d", state)); 1.586 + MutexAutoLock lock(mLock); 1.587 + ASSERT_WEBRTC(IsSTSThread()); 1.588 + // We should abort connection on TS_ERROR. 1.589 + // Note however that the association will also fail (perhaps with a delay) and 1.590 + // notify us in that way 1.591 + if (state != TransportLayer::TS_OPEN || !mMasterSocket) 1.592 + return; 1.593 + 1.594 + struct sockaddr_conn addr; 1.595 + memset(&addr, 0, sizeof(addr)); 1.596 + addr.sconn_family = AF_CONN; 1.597 +#if defined(__Userspace_os_Darwin) 1.598 + addr.sconn_len = sizeof(addr); 1.599 +#endif 1.600 + addr.sconn_port = htons(mLocalPort); 1.601 + addr.sconn_addr = static_cast<void *>(this); 1.602 + 1.603 + LOG(("Calling usrsctp_bind")); 1.604 + int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), 1.605 + sizeof(addr)); 1.606 + if (r < 0) { 1.607 + LOG(("usrsctp_bind failed: %d", r)); 1.608 + } else { 1.609 + // This is the remote addr 1.610 + addr.sconn_port = htons(mRemotePort); 1.611 + LOG(("Calling usrsctp_connect")); 1.612 + r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), 1.613 + sizeof(addr)); 1.614 + if (r < 0) { 1.615 + if (errno == EINPROGRESS) { 1.616 + // non-blocking 1.617 + return; 1.618 + } else { 1.619 + LOG(("usrsctp_connect failed: %d", errno)); 1.620 + mState = CLOSED; 1.621 + } 1.622 + } else { 1.623 + // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that 1.624 + // This also avoids issues with calling TransportFlow stuff on Mainthread 1.625 + return; 1.626 + } 1.627 + } 1.628 + // Note: currently this doesn't actually notify the application 1.629 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.630 + DataChannelOnMessageAvailable::ON_CONNECTION, 1.631 + this, false)); 1.632 + return; 1.633 +} 1.634 + 1.635 +// Process any pending Opens 1.636 +void 1.637 +DataChannelConnection::ProcessQueuedOpens() 1.638 +{ 1.639 + // The nsDeque holds channels with an AddRef applied. Another reference 1.640 + // (may) be held by the DOMDataChannel, unless it's been GC'd. No other 1.641 + // references should exist. 1.642 + 1.643 + // Can't copy nsDeque's. Move into temp array since any that fail will 1.644 + // go back to mPending 1.645 + nsDeque temp; 1.646 + DataChannel *temp_channel; // really already_AddRefed<> 1.647 + while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) { 1.648 + temp.Push(static_cast<void *>(temp_channel)); 1.649 + } 1.650 + 1.651 + nsRefPtr<DataChannel> channel; 1.652 + // All these entries have an AddRef(); make that explicit now via the dont_AddRef() 1.653 + while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) { 1.654 + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { 1.655 + LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream)); 1.656 + channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; 1.657 + // OpenFinish returns a reference itself, so we need to take it can Release it 1.658 + channel = OpenFinish(channel.forget()); // may reset the flag and re-push 1.659 + } else { 1.660 + NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?"); 1.661 + } 1.662 + } 1.663 + 1.664 +} 1.665 +void 1.666 +DataChannelConnection::SctpDtlsInput(TransportFlow *flow, 1.667 + const unsigned char *data, size_t len) 1.668 +{ 1.669 +#ifdef PR_LOGGING 1.670 + if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { 1.671 + char *buf; 1.672 + 1.673 + if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) { 1.674 + PR_LogPrint("%s", buf); 1.675 + usrsctp_freedumpbuffer(buf); 1.676 + } 1.677 + } 1.678 +#endif 1.679 + // Pass the data to SCTP 1.680 + usrsctp_conninput(static_cast<void *>(this), data, len, 0); 1.681 +} 1.682 + 1.683 +int 1.684 +DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release) 1.685 +{ 1.686 + //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len)); 1.687 + int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0; 1.688 + if (release) 1.689 + delete data; 1.690 + return res; 1.691 +} 1.692 + 1.693 +/* static */ 1.694 +int 1.695 +DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length, 1.696 + uint8_t tos, uint8_t set_df) 1.697 +{ 1.698 + DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr); 1.699 + int res; 1.700 + 1.701 +#ifdef PR_LOGGING 1.702 + if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { 1.703 + char *buf; 1.704 + 1.705 + if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) { 1.706 + PR_LogPrint("%s", buf); 1.707 + usrsctp_freedumpbuffer(buf); 1.708 + } 1.709 + } 1.710 +#endif 1.711 + // We're async proxying even if on the STSThread because this is called 1.712 + // with internal SCTP locks held in some cases (such as in usrsctp_connect()). 1.713 + // SCTP has an option for Apple, on IP connections only, to release at least 1.714 + // one of the locks before calling a packet output routine; with changes to 1.715 + // the underlying SCTP stack this might remove the need to use an async proxy. 1.716 + if (0 /*peer->IsSTSThread()*/) { 1.717 + res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false); 1.718 + } else { 1.719 + unsigned char *data = new unsigned char[length]; 1.720 + memcpy(data, buffer, length); 1.721 + res = -1; 1.722 + // XXX It might be worthwhile to add an assertion against the thread 1.723 + // somehow getting into the DataChannel/SCTP code again, as 1.724 + // DISPATCH_SYNC is not fully blocking. This may be tricky, as it 1.725 + // needs to be a per-thread check, not a global. 1.726 + peer->mSTS->Dispatch(WrapRunnable( 1.727 + nsRefPtr<DataChannelConnection>(peer), 1.728 + &DataChannelConnection::SendPacket, data, length, true), 1.729 + NS_DISPATCH_NORMAL); 1.730 + res = 0; // cheat! Packets can always be dropped later anyways 1.731 + } 1.732 + return res; 1.733 +} 1.734 +#endif 1.735 + 1.736 +#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT 1.737 +// listen for incoming associations 1.738 +// Blocks! - Don't call this from main thread! 1.739 + 1.740 +#error This code will not work as-is since SetEvenOdd() runs on Mainthread 1.741 + 1.742 +bool 1.743 +DataChannelConnection::Listen(unsigned short port) 1.744 +{ 1.745 + struct sockaddr_in addr; 1.746 + socklen_t addr_len; 1.747 + 1.748 + NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); 1.749 + 1.750 + /* Acting as the 'server' */ 1.751 + memset((void *)&addr, 0, sizeof(addr)); 1.752 +#ifdef HAVE_SIN_LEN 1.753 + addr.sin_len = sizeof(struct sockaddr_in); 1.754 +#endif 1.755 + addr.sin_family = AF_INET; 1.756 + addr.sin_port = htons(port); 1.757 + addr.sin_addr.s_addr = htonl(INADDR_ANY); 1.758 + LOG(("Waiting for connections on port %u", ntohs(addr.sin_port))); 1.759 + mState = CONNECTING; 1.760 + if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) { 1.761 + LOG(("***Failed userspace_bind")); 1.762 + return false; 1.763 + } 1.764 + if (usrsctp_listen(mMasterSocket, 1) < 0) { 1.765 + LOG(("***Failed userspace_listen")); 1.766 + return false; 1.767 + } 1.768 + 1.769 + LOG(("Accepting connection")); 1.770 + addr_len = 0; 1.771 + if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) { 1.772 + LOG(("***Failed accept")); 1.773 + return false; 1.774 + } 1.775 + mState = OPEN; 1.776 + 1.777 + struct linger l; 1.778 + l.l_onoff = 1; 1.779 + l.l_linger = 0; 1.780 + if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, 1.781 + (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { 1.782 + LOG(("Couldn't set SO_LINGER on SCTP socket")); 1.783 + } 1.784 + 1.785 + SetEvenOdd(); 1.786 + 1.787 + // Notify Connection open 1.788 + // XXX We need to make sure connection sticks around until the message is delivered 1.789 + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); 1.790 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.791 + DataChannelOnMessageAvailable::ON_CONNECTION, 1.792 + this, (DataChannel *) nullptr)); 1.793 + return true; 1.794 +} 1.795 + 1.796 +// Blocks! - Don't call this from main thread! 1.797 +bool 1.798 +DataChannelConnection::Connect(const char *addr, unsigned short port) 1.799 +{ 1.800 + struct sockaddr_in addr4; 1.801 + struct sockaddr_in6 addr6; 1.802 + 1.803 + NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); 1.804 + 1.805 + /* Acting as the connector */ 1.806 + LOG(("Connecting to %s, port %u", addr, port)); 1.807 + memset((void *)&addr4, 0, sizeof(struct sockaddr_in)); 1.808 + memset((void *)&addr6, 0, sizeof(struct sockaddr_in6)); 1.809 +#ifdef HAVE_SIN_LEN 1.810 + addr4.sin_len = sizeof(struct sockaddr_in); 1.811 +#endif 1.812 +#ifdef HAVE_SIN6_LEN 1.813 + addr6.sin6_len = sizeof(struct sockaddr_in6); 1.814 +#endif 1.815 + addr4.sin_family = AF_INET; 1.816 + addr6.sin6_family = AF_INET6; 1.817 + addr4.sin_port = htons(port); 1.818 + addr6.sin6_port = htons(port); 1.819 + mState = CONNECTING; 1.820 + 1.821 +#if !defined(__Userspace_os_Windows) 1.822 + if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { 1.823 + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { 1.824 + LOG(("*** Failed userspace_connect")); 1.825 + return false; 1.826 + } 1.827 + } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { 1.828 + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { 1.829 + LOG(("*** Failed userspace_connect")); 1.830 + return false; 1.831 + } 1.832 + } else { 1.833 + LOG(("*** Illegal destination address.")); 1.834 + } 1.835 +#else 1.836 + { 1.837 + struct sockaddr_storage ss; 1.838 + int sslen = sizeof(ss); 1.839 + 1.840 + if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) { 1.841 + addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr; 1.842 + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { 1.843 + LOG(("*** Failed userspace_connect")); 1.844 + return false; 1.845 + } 1.846 + } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) { 1.847 + addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr; 1.848 + if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { 1.849 + LOG(("*** Failed userspace_connect")); 1.850 + return false; 1.851 + } 1.852 + } else { 1.853 + LOG(("*** Illegal destination address.")); 1.854 + } 1.855 + } 1.856 +#endif 1.857 + 1.858 + mSocket = mMasterSocket; 1.859 + 1.860 + LOG(("connect() succeeded! Entering connected mode")); 1.861 + mState = OPEN; 1.862 + 1.863 + SetEvenOdd(); 1.864 + 1.865 + // Notify Connection open 1.866 + // XXX We need to make sure connection sticks around until the message is delivered 1.867 + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); 1.868 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.869 + DataChannelOnMessageAvailable::ON_CONNECTION, 1.870 + this, (DataChannel *) nullptr)); 1.871 + return true; 1.872 +} 1.873 +#endif 1.874 + 1.875 +DataChannel * 1.876 +DataChannelConnection::FindChannelByStream(uint16_t stream) 1.877 +{ 1.878 + return mStreams.SafeElementAt(stream); 1.879 +} 1.880 + 1.881 +uint16_t 1.882 +DataChannelConnection::FindFreeStream() 1.883 +{ 1.884 + uint32_t i, j, limit; 1.885 + 1.886 + limit = mStreams.Length(); 1.887 + if (limit > MAX_NUM_STREAMS) 1.888 + limit = MAX_NUM_STREAMS; 1.889 + 1.890 + for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) { 1.891 + if (!mStreams[i]) { 1.892 + // Verify it's not still in the process of closing 1.893 + for (j = 0; j < mStreamsResetting.Length(); ++j) { 1.894 + if (mStreamsResetting[j] == i) { 1.895 + break; 1.896 + } 1.897 + } 1.898 + if (j == mStreamsResetting.Length()) 1.899 + break; 1.900 + } 1.901 + } 1.902 + if (i >= limit) { 1.903 + return INVALID_STREAM; 1.904 + } 1.905 + return i; 1.906 +} 1.907 + 1.908 +bool 1.909 +DataChannelConnection::RequestMoreStreams(int32_t aNeeded) 1.910 +{ 1.911 + struct sctp_status status; 1.912 + struct sctp_add_streams sas; 1.913 + uint32_t outStreamsNeeded; 1.914 + socklen_t len; 1.915 + 1.916 + if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) { 1.917 + aNeeded = MAX_NUM_STREAMS - mStreams.Length(); 1.918 + } 1.919 + if (aNeeded <= 0) { 1.920 + return false; 1.921 + } 1.922 + 1.923 + len = (socklen_t)sizeof(struct sctp_status); 1.924 + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) { 1.925 + LOG(("***failed: getsockopt SCTP_STATUS")); 1.926 + return false; 1.927 + } 1.928 + outStreamsNeeded = aNeeded; // number to add 1.929 + 1.930 + // Note: if multiple channel opens happen when we don't have enough space, 1.931 + // we'll call RequestMoreStreams() multiple times 1.932 + memset(&sas, 0, sizeof(sas)); 1.933 + sas.sas_instrms = 0; 1.934 + sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ 1.935 + // Doesn't block, we get an event when it succeeds or fails 1.936 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, 1.937 + (socklen_t) sizeof(struct sctp_add_streams)) < 0) { 1.938 + if (errno == EALREADY) { 1.939 + LOG(("Already have %u output streams", outStreamsNeeded)); 1.940 + return true; 1.941 + } 1.942 + 1.943 + LOG(("***failed: setsockopt ADD errno=%d", errno)); 1.944 + return false; 1.945 + } 1.946 + LOG(("Requested %u more streams", outStreamsNeeded)); 1.947 + // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the 1.948 + // values are larger than mStreams.Length() 1.949 + return true; 1.950 +} 1.951 + 1.952 +int32_t 1.953 +DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream) 1.954 +{ 1.955 + struct sctp_sndinfo sndinfo; 1.956 + 1.957 + // Note: Main-thread IO, but doesn't block 1.958 + memset(&sndinfo, 0, sizeof(struct sctp_sndinfo)); 1.959 + sndinfo.snd_sid = stream; 1.960 + sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); 1.961 + if (usrsctp_sendv(mSocket, msg, len, nullptr, 0, 1.962 + &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), 1.963 + SCTP_SENDV_SNDINFO, 0) < 0) { 1.964 + //LOG(("***failed: sctp_sendv")); don't log because errno is a return! 1.965 + return (0); 1.966 + } 1.967 + return (1); 1.968 +} 1.969 + 1.970 +int32_t 1.971 +DataChannelConnection::SendOpenAckMessage(uint16_t stream) 1.972 +{ 1.973 + struct rtcweb_datachannel_ack ack; 1.974 + 1.975 + memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack)); 1.976 + ack.msg_type = DATA_CHANNEL_ACK; 1.977 + 1.978 + return SendControlMessage(&ack, sizeof(ack), stream); 1.979 +} 1.980 + 1.981 +int32_t 1.982 +DataChannelConnection::SendOpenRequestMessage(const nsACString& label, 1.983 + const nsACString& protocol, 1.984 + uint16_t stream, bool unordered, 1.985 + uint16_t prPolicy, uint32_t prValue) 1.986 +{ 1.987 + int label_len = label.Length(); // not including nul 1.988 + int proto_len = protocol.Length(); // not including nul 1.989 + struct rtcweb_datachannel_open_request *req = 1.990 + (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len); 1.991 + // careful - request includes 1 char label 1.992 + 1.993 + memset(req, 0, sizeof(struct rtcweb_datachannel_open_request)); 1.994 + req->msg_type = DATA_CHANNEL_OPEN_REQUEST; 1.995 + switch (prPolicy) { 1.996 + case SCTP_PR_SCTP_NONE: 1.997 + req->channel_type = DATA_CHANNEL_RELIABLE; 1.998 + break; 1.999 + case SCTP_PR_SCTP_TTL: 1.1000 + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; 1.1001 + break; 1.1002 + case SCTP_PR_SCTP_RTX: 1.1003 + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; 1.1004 + break; 1.1005 + default: 1.1006 + // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno! 1.1007 + moz_free(req); 1.1008 + return (0); 1.1009 + } 1.1010 + if (unordered) { 1.1011 + // Per the current types, all differ by 0x80 between ordered and unordered 1.1012 + req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future 1.1013 + } 1.1014 + 1.1015 + req->reliability_param = htonl(prValue); 1.1016 + req->priority = htons(0); /* XXX: add support */ 1.1017 + req->label_length = htons(label_len); 1.1018 + req->protocol_length = htons(proto_len); 1.1019 + memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); 1.1020 + memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); 1.1021 + 1.1022 + // sizeof(*req) already includes +1 byte for label, need nul for both strings 1.1023 + int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream); 1.1024 + 1.1025 + moz_free(req); 1.1026 + return result; 1.1027 +} 1.1028 + 1.1029 +// XXX This should use a separate thread (outbound queue) which should 1.1030 +// select() to know when to *try* to send data to the socket again. 1.1031 +// Alternatively, it can use a timeout, but that's guaranteed to be wrong 1.1032 +// (just not sure in what direction). We could re-implement NSPR's 1.1033 +// PR_POLL_WRITE/etc handling... with a lot of work. 1.1034 + 1.1035 +// Better yet, use the SCTP stack's notifications on buffer state to avoid 1.1036 +// filling the SCTP's buffers. 1.1037 + 1.1038 +// returns if we're still blocked or not 1.1039 +bool 1.1040 +DataChannelConnection::SendDeferredMessages() 1.1041 +{ 1.1042 + uint32_t i; 1.1043 + nsRefPtr<DataChannel> channel; // we may null out the refs to this 1.1044 + bool still_blocked = false; 1.1045 + bool sent = false; 1.1046 + 1.1047 + // This may block while something is modifying channels, but should not block for IO 1.1048 + MutexAutoLock lock(mLock); 1.1049 + 1.1050 + // XXX For total fairness, on a still_blocked we'd start next time at the 1.1051 + // same index. Sorry, not going to bother for now. 1.1052 + for (i = 0; i < mStreams.Length(); ++i) { 1.1053 + channel = mStreams[i]; 1.1054 + if (!channel) 1.1055 + continue; 1.1056 + 1.1057 + // Only one of these should be set.... 1.1058 + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) { 1.1059 + if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol, 1.1060 + channel->mStream, 1.1061 + channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED, 1.1062 + channel->mPrPolicy, channel->mPrValue)) { 1.1063 + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ; 1.1064 + 1.1065 + channel->mState = OPEN; 1.1066 + channel->mReady = true; 1.1067 + LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); 1.1068 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1069 + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, 1.1070 + channel)); 1.1071 + sent = true; 1.1072 + } else { 1.1073 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.1074 + still_blocked = true; 1.1075 + } else { 1.1076 + // Close the channel, inform the user 1.1077 + mStreams[channel->mStream] = nullptr; 1.1078 + channel->mState = CLOSED; 1.1079 + // Don't need to reset; we didn't open it 1.1080 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1081 + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, 1.1082 + channel)); 1.1083 + } 1.1084 + } 1.1085 + } 1.1086 + if (still_blocked) 1.1087 + break; 1.1088 + 1.1089 + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) { 1.1090 + if (SendOpenAckMessage(channel->mStream)) { 1.1091 + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK; 1.1092 + sent = true; 1.1093 + } else { 1.1094 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.1095 + still_blocked = true; 1.1096 + } else { 1.1097 + // Close the channel, inform the user 1.1098 + CloseInt(channel); 1.1099 + // XXX send error via DataChannelOnMessageAvailable (bug 843625) 1.1100 + } 1.1101 + } 1.1102 + } 1.1103 + if (still_blocked) 1.1104 + break; 1.1105 + 1.1106 + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) { 1.1107 + bool failed_send = false; 1.1108 + int32_t result; 1.1109 + 1.1110 + if (channel->mState == CLOSED || channel->mState == CLOSING) { 1.1111 + channel->mBufferedData.Clear(); 1.1112 + } 1.1113 + while (!channel->mBufferedData.IsEmpty() && 1.1114 + !failed_send) { 1.1115 + struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa; 1.1116 + const char *data = channel->mBufferedData[0]->mData; 1.1117 + uint32_t len = channel->mBufferedData[0]->mLength; 1.1118 + 1.1119 + // SCTP will return EMSGSIZE if the message is bigger than the buffer 1.1120 + // size (or EAGAIN if there isn't space) 1.1121 + if ((result = usrsctp_sendv(mSocket, data, len, 1.1122 + nullptr, 0, 1.1123 + (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa), 1.1124 + SCTP_SENDV_SPA, 1.1125 + 0) < 0)) { 1.1126 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.1127 + // leave queued for resend 1.1128 + failed_send = true; 1.1129 + LOG(("queue full again when resending %d bytes (%d)", len, result)); 1.1130 + } else { 1.1131 + LOG(("error %d re-sending string", errno)); 1.1132 + failed_send = true; 1.1133 + } 1.1134 + } else { 1.1135 + LOG(("Resent buffer of %d bytes (%d)", len, result)); 1.1136 + sent = true; 1.1137 + channel->mBufferedData.RemoveElementAt(0); 1.1138 + } 1.1139 + } 1.1140 + if (channel->mBufferedData.IsEmpty()) 1.1141 + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA; 1.1142 + else 1.1143 + still_blocked = true; 1.1144 + } 1.1145 + if (still_blocked) 1.1146 + break; 1.1147 + } 1.1148 + 1.1149 + if (!still_blocked) { 1.1150 + // mDeferTimeout becomes an estimate of how long we need to wait next time we block 1.1151 + return false; 1.1152 + } 1.1153 + // adjust time? More time for next wait if we didn't send anything, less if did 1.1154 + // Pretty crude, but better than nothing; just to keep CPU use down 1.1155 + if (!sent && mDeferTimeout < 50) 1.1156 + mDeferTimeout++; 1.1157 + else if (sent && mDeferTimeout > 10) 1.1158 + mDeferTimeout--; 1.1159 + 1.1160 + return true; 1.1161 +} 1.1162 + 1.1163 +void 1.1164 +DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, 1.1165 + size_t length, 1.1166 + uint16_t stream) 1.1167 +{ 1.1168 + nsRefPtr<DataChannel> channel; 1.1169 + uint32_t prValue; 1.1170 + uint16_t prPolicy; 1.1171 + uint32_t flags; 1.1172 + 1.1173 + mLock.AssertCurrentThreadOwns(); 1.1174 + 1.1175 + if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) { 1.1176 + LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length, 1.1177 + (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))); 1.1178 + if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) 1.1179 + return; 1.1180 + } 1.1181 + 1.1182 + LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req))); 1.1183 + 1.1184 + switch (req->channel_type) { 1.1185 + case DATA_CHANNEL_RELIABLE: 1.1186 + case DATA_CHANNEL_RELIABLE_UNORDERED: 1.1187 + prPolicy = SCTP_PR_SCTP_NONE; 1.1188 + break; 1.1189 + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: 1.1190 + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: 1.1191 + prPolicy = SCTP_PR_SCTP_RTX; 1.1192 + break; 1.1193 + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: 1.1194 + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: 1.1195 + prPolicy = SCTP_PR_SCTP_TTL; 1.1196 + break; 1.1197 + default: 1.1198 + LOG(("Unknown channel type", req->channel_type)); 1.1199 + /* XXX error handling */ 1.1200 + return; 1.1201 + } 1.1202 + prValue = ntohl(req->reliability_param); 1.1203 + flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; 1.1204 + 1.1205 + if ((channel = FindChannelByStream(stream))) { 1.1206 + if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { 1.1207 + LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.", 1.1208 + stream, channel->mState)); 1.1209 + /* XXX: some error handling */ 1.1210 + } else { 1.1211 + LOG(("Open for externally negotiated channel %u", stream)); 1.1212 + // XXX should also check protocol, maybe label 1.1213 + if (prPolicy != channel->mPrPolicy || 1.1214 + prValue != channel->mPrValue || 1.1215 + flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) 1.1216 + { 1.1217 + LOG(("WARNING: external negotiation mismatch with OpenRequest:" 1.1218 + "channel %u, policy %u/%u, value %u/%u, flags %x/%x", 1.1219 + stream, prPolicy, channel->mPrPolicy, 1.1220 + prValue, channel->mPrValue, flags, channel->mFlags)); 1.1221 + } 1.1222 + } 1.1223 + return; 1.1224 + } 1.1225 + if (stream >= mStreams.Length()) { 1.1226 + LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length())); 1.1227 + return; 1.1228 + } 1.1229 + 1.1230 + nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length))); 1.1231 + nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)], 1.1232 + ntohs(req->protocol_length))); 1.1233 + 1.1234 + channel = new DataChannel(this, 1.1235 + stream, 1.1236 + DataChannel::CONNECTING, 1.1237 + label, 1.1238 + protocol, 1.1239 + prPolicy, prValue, 1.1240 + flags, 1.1241 + nullptr, nullptr); 1.1242 + mStreams[stream] = channel; 1.1243 + 1.1244 + channel->mState = DataChannel::WAITING_TO_OPEN; 1.1245 + 1.1246 + LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__, 1.1247 + channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState)); 1.1248 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1249 + DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, 1.1250 + this, channel)); 1.1251 + 1.1252 + LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); 1.1253 + 1.1254 + if (!SendOpenAckMessage(stream)) { 1.1255 + // XXX Only on EAGAIN!? And if not, then close the channel?? 1.1256 + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK; 1.1257 + StartDefer(); 1.1258 + } 1.1259 + 1.1260 + // Now process any queued data messages for the channel (which will 1.1261 + // themselves likely get queued until we leave WAITING_TO_OPEN, plus any 1.1262 + // more that come in before that happens) 1.1263 + DeliverQueuedData(stream); 1.1264 +} 1.1265 + 1.1266 +// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. 1.1267 +// That would make this code moot. Keep it for now for backwards compatibility. 1.1268 +void 1.1269 +DataChannelConnection::DeliverQueuedData(uint16_t stream) 1.1270 +{ 1.1271 + mLock.AssertCurrentThreadOwns(); 1.1272 + 1.1273 + uint32_t i = 0; 1.1274 + while (i < mQueuedData.Length()) { 1.1275 + // Careful! we may modify the array length from within the loop! 1.1276 + if (mQueuedData[i]->mStream == stream) { 1.1277 + LOG(("Delivering queued data for stream %u, length %u", 1.1278 + stream, mQueuedData[i]->mLength)); 1.1279 + // Deliver the queued data 1.1280 + HandleDataMessage(mQueuedData[i]->mPpid, 1.1281 + mQueuedData[i]->mData, mQueuedData[i]->mLength, 1.1282 + mQueuedData[i]->mStream); 1.1283 + mQueuedData.RemoveElementAt(i); 1.1284 + continue; // don't bump index since we removed the element 1.1285 + } 1.1286 + i++; 1.1287 + } 1.1288 +} 1.1289 + 1.1290 +void 1.1291 +DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, 1.1292 + size_t length, uint16_t stream) 1.1293 +{ 1.1294 + DataChannel *channel; 1.1295 + 1.1296 + mLock.AssertCurrentThreadOwns(); 1.1297 + 1.1298 + channel = FindChannelByStream(stream); 1.1299 + NS_ENSURE_TRUE_VOID(channel); 1.1300 + 1.1301 + LOG(("OpenAck received for stream %u, waiting=%d", stream, 1.1302 + (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0)); 1.1303 + 1.1304 + channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK; 1.1305 +} 1.1306 + 1.1307 +void 1.1308 +DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream) 1.1309 +{ 1.1310 + /* XXX: Send an error message? */ 1.1311 + LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream)); 1.1312 + // XXX Log to JS error console if possible 1.1313 +} 1.1314 + 1.1315 +void 1.1316 +DataChannelConnection::HandleDataMessage(uint32_t ppid, 1.1317 + const void *data, size_t length, 1.1318 + uint16_t stream) 1.1319 +{ 1.1320 + DataChannel *channel; 1.1321 + const char *buffer = (const char *) data; 1.1322 + 1.1323 + mLock.AssertCurrentThreadOwns(); 1.1324 + 1.1325 + channel = FindChannelByStream(stream); 1.1326 + 1.1327 + // XXX A closed channel may trip this... check 1.1328 + // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. 1.1329 + // That would make this code moot. Keep it for now for backwards compatibility. 1.1330 + if (!channel) { 1.1331 + // In the updated 0-RTT open case, the sender can send data immediately 1.1332 + // after Open, and doesn't set the in-order bit (since we don't have a 1.1333 + // response or ack). Also, with external negotiation, data can come in 1.1334 + // before we're told about the external negotiation. We need to buffer 1.1335 + // data until either a) Open comes in, if the ordering get messed up, 1.1336 + // or b) the app tells us this channel was externally negotiated. When 1.1337 + // these occur, we deliver the data. 1.1338 + 1.1339 + // Since this is rare and non-performance, keep a single list of queued 1.1340 + // data messages to deliver once the channel opens. 1.1341 + LOG(("Queuing data for stream %u, length %u", stream, length)); 1.1342 + // Copies data 1.1343 + mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length)); 1.1344 + return; 1.1345 + } 1.1346 + 1.1347 + // XXX should this be a simple if, no warnings/debugbreaks? 1.1348 + NS_ENSURE_TRUE_VOID(channel->mState != CLOSED); 1.1349 + 1.1350 + { 1.1351 + nsAutoCString recvData(buffer, length); // copies (<64) or allocates 1.1352 + bool is_binary = true; 1.1353 + 1.1354 + if (ppid == DATA_CHANNEL_PPID_DOMSTRING || 1.1355 + ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) { 1.1356 + is_binary = false; 1.1357 + } 1.1358 + if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) { 1.1359 + NS_WARNING("DataChannel message aborted by fragment type change!"); 1.1360 + channel->mRecvBuffer.Truncate(0); 1.1361 + } 1.1362 + channel->mIsRecvBinary = is_binary; 1.1363 + 1.1364 + switch (ppid) { 1.1365 + case DATA_CHANNEL_PPID_DOMSTRING: 1.1366 + case DATA_CHANNEL_PPID_BINARY: 1.1367 + channel->mRecvBuffer += recvData; 1.1368 + LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u", 1.1369 + is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(), 1.1370 + channel->mStream)); 1.1371 + return; // Not ready to notify application 1.1372 + 1.1373 + case DATA_CHANNEL_PPID_DOMSTRING_LAST: 1.1374 + LOG(("DataChannel: String message received of length %lu on channel %u", 1.1375 + length, channel->mStream)); 1.1376 + if (!channel->mRecvBuffer.IsEmpty()) { 1.1377 + channel->mRecvBuffer += recvData; 1.1378 + LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel)); 1.1379 + channel->SendOrQueue(new DataChannelOnMessageAvailable( 1.1380 + DataChannelOnMessageAvailable::ON_DATA, this, 1.1381 + channel, channel->mRecvBuffer, -1)); 1.1382 + channel->mRecvBuffer.Truncate(0); 1.1383 + return; 1.1384 + } 1.1385 + // else send using recvData normally 1.1386 + length = -1; // Flag for DOMString 1.1387 + 1.1388 + // WebSockets checks IsUTF8() here; we can try to deliver it 1.1389 + break; 1.1390 + 1.1391 + case DATA_CHANNEL_PPID_BINARY_LAST: 1.1392 + LOG(("DataChannel: Received binary message of length %lu on channel id %u", 1.1393 + length, channel->mStream)); 1.1394 + if (!channel->mRecvBuffer.IsEmpty()) { 1.1395 + channel->mRecvBuffer += recvData; 1.1396 + LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); 1.1397 + channel->SendOrQueue(new DataChannelOnMessageAvailable( 1.1398 + DataChannelOnMessageAvailable::ON_DATA, this, 1.1399 + channel, channel->mRecvBuffer, 1.1400 + channel->mRecvBuffer.Length())); 1.1401 + channel->mRecvBuffer.Truncate(0); 1.1402 + return; 1.1403 + } 1.1404 + // else send using recvData normally 1.1405 + break; 1.1406 + 1.1407 + default: 1.1408 + NS_ERROR("Unknown data PPID"); 1.1409 + return; 1.1410 + } 1.1411 + /* Notify onmessage */ 1.1412 + LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); 1.1413 + channel->SendOrQueue(new DataChannelOnMessageAvailable( 1.1414 + DataChannelOnMessageAvailable::ON_DATA, this, 1.1415 + channel, recvData, length)); 1.1416 + } 1.1417 +} 1.1418 + 1.1419 +// Called with mLock locked! 1.1420 +void 1.1421 +DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream) 1.1422 +{ 1.1423 + const struct rtcweb_datachannel_open_request *req; 1.1424 + const struct rtcweb_datachannel_ack *ack; 1.1425 + 1.1426 + mLock.AssertCurrentThreadOwns(); 1.1427 + 1.1428 + switch (ppid) { 1.1429 + case DATA_CHANNEL_PPID_CONTROL: 1.1430 + req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer); 1.1431 + 1.1432 + NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message 1.1433 + switch (req->msg_type) { 1.1434 + case DATA_CHANNEL_OPEN_REQUEST: 1.1435 + // structure includes a possibly-unused char label[1] (in a packed structure) 1.1436 + NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1); 1.1437 + 1.1438 + HandleOpenRequestMessage(req, length, stream); 1.1439 + break; 1.1440 + case DATA_CHANNEL_ACK: 1.1441 + // >= sizeof(*ack) checked above 1.1442 + 1.1443 + ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer); 1.1444 + HandleOpenAckMessage(ack, length, stream); 1.1445 + break; 1.1446 + default: 1.1447 + HandleUnknownMessage(ppid, length, stream); 1.1448 + break; 1.1449 + } 1.1450 + break; 1.1451 + case DATA_CHANNEL_PPID_DOMSTRING: 1.1452 + case DATA_CHANNEL_PPID_DOMSTRING_LAST: 1.1453 + case DATA_CHANNEL_PPID_BINARY: 1.1454 + case DATA_CHANNEL_PPID_BINARY_LAST: 1.1455 + HandleDataMessage(ppid, buffer, length, stream); 1.1456 + break; 1.1457 + default: 1.1458 + LOG(("Message of length %lu, PPID %u on stream %u received.", 1.1459 + length, ppid, stream)); 1.1460 + break; 1.1461 + } 1.1462 +} 1.1463 + 1.1464 +void 1.1465 +DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac) 1.1466 +{ 1.1467 + uint32_t i, n; 1.1468 + 1.1469 + switch (sac->sac_state) { 1.1470 + case SCTP_COMM_UP: 1.1471 + LOG(("Association change: SCTP_COMM_UP")); 1.1472 + if (mState == CONNECTING) { 1.1473 + mSocket = mMasterSocket; 1.1474 + mState = OPEN; 1.1475 + 1.1476 + SetEvenOdd(); 1.1477 + 1.1478 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1479 + DataChannelOnMessageAvailable::ON_CONNECTION, 1.1480 + this, true)); 1.1481 + LOG(("DTLS connect() succeeded! Entering connected mode")); 1.1482 + 1.1483 + // Open any streams pending... 1.1484 + ProcessQueuedOpens(); 1.1485 + 1.1486 + } else if (mState == OPEN) { 1.1487 + LOG(("DataConnection Already OPEN")); 1.1488 + } else { 1.1489 + LOG(("Unexpected state: %d", mState)); 1.1490 + } 1.1491 + break; 1.1492 + case SCTP_COMM_LOST: 1.1493 + LOG(("Association change: SCTP_COMM_LOST")); 1.1494 + // This association is toast, so also close all the channels -- from mainthread! 1.1495 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1496 + DataChannelOnMessageAvailable::ON_DISCONNECTED, 1.1497 + this)); 1.1498 + break; 1.1499 + case SCTP_RESTART: 1.1500 + LOG(("Association change: SCTP_RESTART")); 1.1501 + break; 1.1502 + case SCTP_SHUTDOWN_COMP: 1.1503 + LOG(("Association change: SCTP_SHUTDOWN_COMP")); 1.1504 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1505 + DataChannelOnMessageAvailable::ON_DISCONNECTED, 1.1506 + this)); 1.1507 + break; 1.1508 + case SCTP_CANT_STR_ASSOC: 1.1509 + LOG(("Association change: SCTP_CANT_STR_ASSOC")); 1.1510 + break; 1.1511 + default: 1.1512 + LOG(("Association change: UNKNOWN")); 1.1513 + break; 1.1514 + } 1.1515 + LOG(("Association change: streams (in/out) = (%u/%u)", 1.1516 + sac->sac_inbound_streams, sac->sac_outbound_streams)); 1.1517 + 1.1518 + NS_ENSURE_TRUE_VOID(sac); 1.1519 + n = sac->sac_length - sizeof(*sac); 1.1520 + if (((sac->sac_state == SCTP_COMM_UP) || 1.1521 + (sac->sac_state == SCTP_RESTART)) && (n > 0)) { 1.1522 + for (i = 0; i < n; ++i) { 1.1523 + switch (sac->sac_info[i]) { 1.1524 + case SCTP_ASSOC_SUPPORTS_PR: 1.1525 + LOG(("Supports: PR")); 1.1526 + break; 1.1527 + case SCTP_ASSOC_SUPPORTS_AUTH: 1.1528 + LOG(("Supports: AUTH")); 1.1529 + break; 1.1530 + case SCTP_ASSOC_SUPPORTS_ASCONF: 1.1531 + LOG(("Supports: ASCONF")); 1.1532 + break; 1.1533 + case SCTP_ASSOC_SUPPORTS_MULTIBUF: 1.1534 + LOG(("Supports: MULTIBUF")); 1.1535 + break; 1.1536 + case SCTP_ASSOC_SUPPORTS_RE_CONFIG: 1.1537 + LOG(("Supports: RE-CONFIG")); 1.1538 + break; 1.1539 + default: 1.1540 + LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); 1.1541 + break; 1.1542 + } 1.1543 + } 1.1544 + } else if (((sac->sac_state == SCTP_COMM_LOST) || 1.1545 + (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { 1.1546 + LOG(("Association: ABORT =")); 1.1547 + for (i = 0; i < n; ++i) { 1.1548 + LOG((" 0x%02x", sac->sac_info[i])); 1.1549 + } 1.1550 + } 1.1551 + if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || 1.1552 + (sac->sac_state == SCTP_SHUTDOWN_COMP) || 1.1553 + (sac->sac_state == SCTP_COMM_LOST)) { 1.1554 + return; 1.1555 + } 1.1556 +} 1.1557 + 1.1558 +void 1.1559 +DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc) 1.1560 +{ 1.1561 + char addr_buf[INET6_ADDRSTRLEN]; 1.1562 + const char *addr = ""; 1.1563 + struct sockaddr_in *sin; 1.1564 + struct sockaddr_in6 *sin6; 1.1565 +#if defined(__Userspace_os_Windows) 1.1566 + DWORD addr_len = INET6_ADDRSTRLEN; 1.1567 +#endif 1.1568 + 1.1569 + switch (spc->spc_aaddr.ss_family) { 1.1570 + case AF_INET: 1.1571 + sin = (struct sockaddr_in *)&spc->spc_aaddr; 1.1572 +#if !defined(__Userspace_os_Windows) 1.1573 + addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); 1.1574 +#else 1.1575 + if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr, 1.1576 + addr_buf, &addr_len)) { 1.1577 + return; 1.1578 + } 1.1579 +#endif 1.1580 + break; 1.1581 + case AF_INET6: 1.1582 + sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr; 1.1583 +#if !defined(__Userspace_os_Windows) 1.1584 + addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); 1.1585 +#else 1.1586 + if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr, 1.1587 + addr_buf, &addr_len)) { 1.1588 + return; 1.1589 + } 1.1590 +#endif 1.1591 + case AF_CONN: 1.1592 + addr = "DTLS connection"; 1.1593 + break; 1.1594 + default: 1.1595 + break; 1.1596 + } 1.1597 + LOG(("Peer address %s is now ", addr)); 1.1598 + switch (spc->spc_state) { 1.1599 + case SCTP_ADDR_AVAILABLE: 1.1600 + LOG(("SCTP_ADDR_AVAILABLE")); 1.1601 + break; 1.1602 + case SCTP_ADDR_UNREACHABLE: 1.1603 + LOG(("SCTP_ADDR_UNREACHABLE")); 1.1604 + break; 1.1605 + case SCTP_ADDR_REMOVED: 1.1606 + LOG(("SCTP_ADDR_REMOVED")); 1.1607 + break; 1.1608 + case SCTP_ADDR_ADDED: 1.1609 + LOG(("SCTP_ADDR_ADDED")); 1.1610 + break; 1.1611 + case SCTP_ADDR_MADE_PRIM: 1.1612 + LOG(("SCTP_ADDR_MADE_PRIM")); 1.1613 + break; 1.1614 + case SCTP_ADDR_CONFIRMED: 1.1615 + LOG(("SCTP_ADDR_CONFIRMED")); 1.1616 + break; 1.1617 + default: 1.1618 + LOG(("UNKNOWN")); 1.1619 + break; 1.1620 + } 1.1621 + LOG((" (error = 0x%08x).\n", spc->spc_error)); 1.1622 +} 1.1623 + 1.1624 +void 1.1625 +DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre) 1.1626 +{ 1.1627 + size_t i, n; 1.1628 + 1.1629 + n = sre->sre_length - sizeof(struct sctp_remote_error); 1.1630 + LOG(("Remote Error (error = 0x%04x): ", sre->sre_error)); 1.1631 + for (i = 0; i < n; ++i) { 1.1632 + LOG((" 0x%02x", sre-> sre_data[i])); 1.1633 + } 1.1634 +} 1.1635 + 1.1636 +void 1.1637 +DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse) 1.1638 +{ 1.1639 + LOG(("Shutdown event.")); 1.1640 + /* XXX: notify all channels. */ 1.1641 + // Attempts to actually send anything will fail 1.1642 +} 1.1643 + 1.1644 +void 1.1645 +DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai) 1.1646 +{ 1.1647 + LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind)); 1.1648 +} 1.1649 + 1.1650 +void 1.1651 +DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe) 1.1652 +{ 1.1653 + size_t i, n; 1.1654 + 1.1655 + if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { 1.1656 + LOG(("Unsent ")); 1.1657 + } 1.1658 + if (ssfe->ssfe_flags & SCTP_DATA_SENT) { 1.1659 + LOG(("Sent ")); 1.1660 + } 1.1661 + if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { 1.1662 + LOG(("(flags = %x) ", ssfe->ssfe_flags)); 1.1663 + } 1.1664 + LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x", 1.1665 + ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, 1.1666 + ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); 1.1667 + n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); 1.1668 + for (i = 0; i < n; ++i) { 1.1669 + LOG((" 0x%02x", ssfe->ssfe_data[i])); 1.1670 + } 1.1671 +} 1.1672 + 1.1673 +void 1.1674 +DataChannelConnection::ClearResets() 1.1675 +{ 1.1676 + // Clear all pending resets 1.1677 + if (!mStreamsResetting.IsEmpty()) { 1.1678 + LOG(("Clearing resets for %d streams", mStreamsResetting.Length())); 1.1679 + } 1.1680 + 1.1681 + for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) { 1.1682 + nsRefPtr<DataChannel> channel; 1.1683 + channel = FindChannelByStream(mStreamsResetting[i]); 1.1684 + if (channel) { 1.1685 + LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get())); 1.1686 + mStreams[channel->mStream] = nullptr; 1.1687 + } 1.1688 + } 1.1689 + mStreamsResetting.Clear(); 1.1690 +} 1.1691 + 1.1692 +void 1.1693 +DataChannelConnection::ResetOutgoingStream(uint16_t stream) 1.1694 +{ 1.1695 + uint32_t i; 1.1696 + 1.1697 + mLock.AssertCurrentThreadOwns(); 1.1698 + LOG(("Connection %p: Resetting outgoing stream %u", 1.1699 + (void *) this, stream)); 1.1700 + // Rarely has more than a couple items and only for a short time 1.1701 + for (i = 0; i < mStreamsResetting.Length(); ++i) { 1.1702 + if (mStreamsResetting[i] == stream) { 1.1703 + return; 1.1704 + } 1.1705 + } 1.1706 + mStreamsResetting.AppendElement(stream); 1.1707 +} 1.1708 + 1.1709 +void 1.1710 +DataChannelConnection::SendOutgoingStreamReset() 1.1711 +{ 1.1712 + struct sctp_reset_streams *srs; 1.1713 + uint32_t i; 1.1714 + size_t len; 1.1715 + 1.1716 + LOG(("Connection %p: Sending outgoing stream reset for %d streams", 1.1717 + (void *) this, mStreamsResetting.Length())); 1.1718 + mLock.AssertCurrentThreadOwns(); 1.1719 + if (mStreamsResetting.IsEmpty()) { 1.1720 + LOG(("No streams to reset")); 1.1721 + return; 1.1722 + } 1.1723 + len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); 1.1724 + srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc 1.1725 + memset(srs, 0, len); 1.1726 + srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; 1.1727 + srs->srs_number_streams = mStreamsResetting.Length(); 1.1728 + for (i = 0; i < mStreamsResetting.Length(); ++i) { 1.1729 + srs->srs_stream_list[i] = mStreamsResetting[i]; 1.1730 + } 1.1731 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) { 1.1732 + LOG(("***failed: setsockopt RESET, errno %d", errno)); 1.1733 + // if errno == EALREADY, this is normal - we can't send another reset 1.1734 + // with one pending. 1.1735 + // When we get an incoming reset (which may be a response to our 1.1736 + // outstanding one), see if we have any pending outgoing resets and 1.1737 + // send them 1.1738 + } else { 1.1739 + mStreamsResetting.Clear(); 1.1740 + } 1.1741 + moz_free(srs); 1.1742 +} 1.1743 + 1.1744 +void 1.1745 +DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) 1.1746 +{ 1.1747 + uint32_t n, i; 1.1748 + nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel 1.1749 + 1.1750 + if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && 1.1751 + !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { 1.1752 + n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); 1.1753 + for (i = 0; i < n; ++i) { 1.1754 + if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { 1.1755 + channel = FindChannelByStream(strrst->strreset_stream_list[i]); 1.1756 + if (channel) { 1.1757 + // The other side closed the channel 1.1758 + // We could be in three states: 1.1759 + // 1. Normal state (input and output streams (OPEN) 1.1760 + // Notify application, send a RESET in response on our 1.1761 + // outbound channel. Go to CLOSED 1.1762 + // 2. We sent our own reset (CLOSING); either they crossed on the 1.1763 + // wire, or this is a response to our Reset. 1.1764 + // Go to CLOSED 1.1765 + // 3. We've sent a open but haven't gotten a response yet (CONNECTING) 1.1766 + // I believe this is impossible, as we don't have an input stream yet. 1.1767 + 1.1768 + LOG(("Incoming: Channel %u closed, state %d", 1.1769 + channel->mStream, channel->mState)); 1.1770 + ASSERT_WEBRTC(channel->mState == DataChannel::OPEN || 1.1771 + channel->mState == DataChannel::CLOSING || 1.1772 + channel->mState == DataChannel::CONNECTING || 1.1773 + channel->mState == DataChannel::WAITING_TO_OPEN); 1.1774 + if (channel->mState == DataChannel::OPEN || 1.1775 + channel->mState == DataChannel::WAITING_TO_OPEN) { 1.1776 + ResetOutgoingStream(channel->mStream); 1.1777 + SendOutgoingStreamReset(); 1.1778 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1779 + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, 1.1780 + channel)); 1.1781 + } 1.1782 + mStreams[channel->mStream] = nullptr; 1.1783 + 1.1784 + LOG(("Disconnected DataChannel %p from connection %p", 1.1785 + (void *) channel.get(), (void *) channel->mConnection.get())); 1.1786 + channel->Destroy(); 1.1787 + // At this point when we leave here, the object is a zombie held alive only by the DOM object 1.1788 + } else { 1.1789 + LOG(("Can't find incoming channel %d",i)); 1.1790 + } 1.1791 + } 1.1792 + } 1.1793 + } 1.1794 + 1.1795 + // In case we failed to send a RESET due to having one outstanding, process any pending resets now: 1.1796 + if (!mStreamsResetting.IsEmpty()) { 1.1797 + LOG(("Sending %d pending resets", mStreamsResetting.Length())); 1.1798 + SendOutgoingStreamReset(); 1.1799 + } 1.1800 +} 1.1801 + 1.1802 +void 1.1803 +DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg) 1.1804 +{ 1.1805 + uint16_t stream; 1.1806 + uint32_t i; 1.1807 + nsRefPtr<DataChannel> channel; 1.1808 + 1.1809 + if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { 1.1810 + LOG(("*** Failed increasing number of streams from %u (%u/%u)", 1.1811 + mStreams.Length(), 1.1812 + strchg->strchange_instrms, 1.1813 + strchg->strchange_outstrms)); 1.1814 + // XXX FIX! notify pending opens of failure 1.1815 + return; 1.1816 + } else { 1.1817 + if (strchg->strchange_instrms > mStreams.Length()) { 1.1818 + LOG(("Other side increased streams from %u to %u", 1.1819 + mStreams.Length(), strchg->strchange_instrms)); 1.1820 + } 1.1821 + if (strchg->strchange_outstrms > mStreams.Length() || 1.1822 + strchg->strchange_instrms > mStreams.Length()) { 1.1823 + uint16_t old_len = mStreams.Length(); 1.1824 + uint16_t new_len = std::max(strchg->strchange_outstrms, 1.1825 + strchg->strchange_instrms); 1.1826 + LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)", 1.1827 + old_len, new_len, new_len - old_len, 1.1828 + strchg->strchange_instrms)); 1.1829 + // make sure both are the same length 1.1830 + mStreams.AppendElements(new_len - old_len); 1.1831 + LOG(("New length = %d (was %d)", mStreams.Length(), old_len)); 1.1832 + for (size_t i = old_len; i < mStreams.Length(); ++i) { 1.1833 + mStreams[i] = nullptr; 1.1834 + } 1.1835 + // Re-process any channels waiting for streams. 1.1836 + // Linear search, but we don't increase channels often and 1.1837 + // the array would only get long in case of an app error normally 1.1838 + 1.1839 + // Make sure we request enough streams if there's a big jump in streams 1.1840 + // Could make a more complex API for OpenXxxFinish() and avoid this loop 1.1841 + int32_t num_needed = mPending.GetSize(); 1.1842 + LOG(("%d of %d new streams already needed", num_needed, 1.1843 + new_len - old_len)); 1.1844 + num_needed -= (new_len - old_len); // number we added 1.1845 + if (num_needed > 0) { 1.1846 + if (num_needed < 16) 1.1847 + num_needed = 16; 1.1848 + LOG(("Not enough new streams, asking for %d more", num_needed)); 1.1849 + RequestMoreStreams(num_needed); 1.1850 + } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { 1.1851 + LOG(("Requesting %d output streams to match partner", 1.1852 + strchg->strchange_instrms - strchg->strchange_outstrms)); 1.1853 + RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms); 1.1854 + } 1.1855 + 1.1856 + ProcessQueuedOpens(); 1.1857 + } 1.1858 + // else probably not a change in # of streams 1.1859 + } 1.1860 + 1.1861 + for (i = 0; i < mStreams.Length(); ++i) { 1.1862 + channel = mStreams[i]; 1.1863 + if (!channel) 1.1864 + continue; 1.1865 + 1.1866 + if ((channel->mState == CONNECTING) && 1.1867 + (channel->mStream == INVALID_STREAM)) { 1.1868 + if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || 1.1869 + (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { 1.1870 + /* XXX: Signal to the other end. */ 1.1871 + channel->mState = CLOSED; 1.1872 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.1873 + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, 1.1874 + channel)); 1.1875 + // maybe fire onError (bug 843625) 1.1876 + } else { 1.1877 + stream = FindFreeStream(); 1.1878 + if (stream != INVALID_STREAM) { 1.1879 + channel->mStream = stream; 1.1880 + mStreams[stream] = channel; 1.1881 + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; 1.1882 + /// XXX fix 1.1883 + StartDefer(); 1.1884 + } else { 1.1885 + /* We will not find more ... */ 1.1886 + break; 1.1887 + } 1.1888 + } 1.1889 + } 1.1890 + } 1.1891 +} 1.1892 + 1.1893 + 1.1894 +// Called with mLock locked! 1.1895 +void 1.1896 +DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n) 1.1897 +{ 1.1898 + mLock.AssertCurrentThreadOwns(); 1.1899 + if (notif->sn_header.sn_length != (uint32_t)n) { 1.1900 + return; 1.1901 + } 1.1902 + switch (notif->sn_header.sn_type) { 1.1903 + case SCTP_ASSOC_CHANGE: 1.1904 + HandleAssociationChangeEvent(&(notif->sn_assoc_change)); 1.1905 + break; 1.1906 + case SCTP_PEER_ADDR_CHANGE: 1.1907 + HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); 1.1908 + break; 1.1909 + case SCTP_REMOTE_ERROR: 1.1910 + HandleRemoteErrorEvent(&(notif->sn_remote_error)); 1.1911 + break; 1.1912 + case SCTP_SHUTDOWN_EVENT: 1.1913 + HandleShutdownEvent(&(notif->sn_shutdown_event)); 1.1914 + break; 1.1915 + case SCTP_ADAPTATION_INDICATION: 1.1916 + HandleAdaptationIndication(&(notif->sn_adaptation_event)); 1.1917 + break; 1.1918 + case SCTP_PARTIAL_DELIVERY_EVENT: 1.1919 + LOG(("SCTP_PARTIAL_DELIVERY_EVENT")); 1.1920 + break; 1.1921 + case SCTP_AUTHENTICATION_EVENT: 1.1922 + LOG(("SCTP_AUTHENTICATION_EVENT")); 1.1923 + break; 1.1924 + case SCTP_SENDER_DRY_EVENT: 1.1925 + //LOG(("SCTP_SENDER_DRY_EVENT")); 1.1926 + break; 1.1927 + case SCTP_NOTIFICATIONS_STOPPED_EVENT: 1.1928 + LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); 1.1929 + break; 1.1930 + case SCTP_SEND_FAILED_EVENT: 1.1931 + HandleSendFailedEvent(&(notif->sn_send_failed_event)); 1.1932 + break; 1.1933 + case SCTP_STREAM_RESET_EVENT: 1.1934 + HandleStreamResetEvent(&(notif->sn_strreset_event)); 1.1935 + break; 1.1936 + case SCTP_ASSOC_RESET_EVENT: 1.1937 + LOG(("SCTP_ASSOC_RESET_EVENT")); 1.1938 + break; 1.1939 + case SCTP_STREAM_CHANGE_EVENT: 1.1940 + HandleStreamChangeEvent(&(notif->sn_strchange_event)); 1.1941 + break; 1.1942 + default: 1.1943 + LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); 1.1944 + break; 1.1945 + } 1.1946 + } 1.1947 + 1.1948 +int 1.1949 +DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen, 1.1950 + struct sctp_rcvinfo rcv, int32_t flags) 1.1951 +{ 1.1952 + ASSERT_WEBRTC(!NS_IsMainThread()); 1.1953 + 1.1954 + if (!data) { 1.1955 + usrsctp_close(sock); // SCTP has finished shutting down 1.1956 + } else { 1.1957 + MutexAutoLock lock(mLock); 1.1958 + if (flags & MSG_NOTIFICATION) { 1.1959 + HandleNotification(static_cast<union sctp_notification *>(data), datalen); 1.1960 + } else { 1.1961 + HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid); 1.1962 + } 1.1963 + } 1.1964 + // sctp allocates 'data' with malloc(), and expects the receiver to free 1.1965 + // it (presumably with free). 1.1966 + // XXX future optimization: try to deliver messages without an internal 1.1967 + // alloc/copy, and if so delay the free until later. 1.1968 + free(data); 1.1969 + // usrsctp defines the callback as returning an int, but doesn't use it 1.1970 + return 1; 1.1971 +} 1.1972 + 1.1973 +already_AddRefed<DataChannel> 1.1974 +DataChannelConnection::Open(const nsACString& label, const nsACString& protocol, 1.1975 + Type type, bool inOrder, 1.1976 + uint32_t prValue, DataChannelListener *aListener, 1.1977 + nsISupports *aContext, bool aExternalNegotiated, 1.1978 + uint16_t aStream) 1.1979 +{ 1.1980 + // aStream == INVALID_STREAM to have the protocol allocate 1.1981 + uint16_t prPolicy = SCTP_PR_SCTP_NONE; 1.1982 + uint32_t flags; 1.1983 + 1.1984 + LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u", 1.1985 + PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(), 1.1986 + type, inOrder, prValue, aListener, aContext, 1.1987 + aExternalNegotiated ? "true" : "false", aStream)); 1.1988 + switch (type) { 1.1989 + case DATA_CHANNEL_RELIABLE: 1.1990 + prPolicy = SCTP_PR_SCTP_NONE; 1.1991 + break; 1.1992 + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: 1.1993 + prPolicy = SCTP_PR_SCTP_RTX; 1.1994 + break; 1.1995 + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: 1.1996 + prPolicy = SCTP_PR_SCTP_TTL; 1.1997 + break; 1.1998 + } 1.1999 + if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) { 1.2000 + return nullptr; 1.2001 + } 1.2002 + 1.2003 + // Don't look past currently-negotiated streams 1.2004 + if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) { 1.2005 + LOG(("ERROR: external negotiation of already-open channel %u", aStream)); 1.2006 + // XXX How do we indicate this up to the application? Probably the 1.2007 + // caller's job, but we may need to return an error code. 1.2008 + return nullptr; 1.2009 + } 1.2010 + 1.2011 + flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; 1.2012 + nsRefPtr<DataChannel> channel(new DataChannel(this, 1.2013 + aStream, 1.2014 + DataChannel::CONNECTING, 1.2015 + label, protocol, 1.2016 + type, prValue, 1.2017 + flags, 1.2018 + aListener, aContext)); 1.2019 + if (aExternalNegotiated) { 1.2020 + channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED; 1.2021 + } 1.2022 + 1.2023 + MutexAutoLock lock(mLock); // OpenFinish assumes this 1.2024 + return OpenFinish(channel.forget()); 1.2025 +} 1.2026 + 1.2027 +// Separate routine so we can also call it to finish up from pending opens 1.2028 +already_AddRefed<DataChannel> 1.2029 +DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel) 1.2030 +{ 1.2031 + nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in 1.2032 + // Normally 1 reference if called from ::Open(), or 2 if called from 1.2033 + // ProcessQueuedOpens() unless the DOMDataChannel was gc'd 1.2034 + uint16_t stream = channel->mStream; 1.2035 + bool queue = false; 1.2036 + 1.2037 + mLock.AssertCurrentThreadOwns(); 1.2038 + 1.2039 + // Cases we care about: 1.2040 + // Pre-negotiated: 1.2041 + // Not Open: 1.2042 + // Doesn't fit: 1.2043 + // -> change initial ask or renegotiate after open 1.2044 + // -> queue open 1.2045 + // Open: 1.2046 + // Doesn't fit: 1.2047 + // -> RequestMoreStreams && queue 1.2048 + // Does fit: 1.2049 + // -> open 1.2050 + // Not negotiated: 1.2051 + // Not Open: 1.2052 + // -> queue open 1.2053 + // Open: 1.2054 + // -> Try to get a stream 1.2055 + // Doesn't fit: 1.2056 + // -> RequestMoreStreams && queue 1.2057 + // Does fit: 1.2058 + // -> open 1.2059 + // So the Open cases are basically the same 1.2060 + // Not Open cases are simply queue for non-negotiated, and 1.2061 + // either change the initial ask or possibly renegotiate after open. 1.2062 + 1.2063 + if (mState == OPEN) { 1.2064 + if (stream == INVALID_STREAM) { 1.2065 + stream = FindFreeStream(); // may be INVALID_STREAM if we need more 1.2066 + } 1.2067 + if (stream == INVALID_STREAM || stream >= mStreams.Length()) { 1.2068 + // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams 1.2069 + // to avoid going back immediately for more if the ask to N, N+1, etc 1.2070 + int32_t more_needed = (stream == INVALID_STREAM) ? 16 : 1.2071 + (stream-((int32_t)mStreams.Length())) + 16; 1.2072 + if (!RequestMoreStreams(more_needed)) { 1.2073 + // Something bad happened... we're done 1.2074 + goto request_error_cleanup; 1.2075 + } 1.2076 + queue = true; 1.2077 + } 1.2078 + } else { 1.2079 + // not OPEN 1.2080 + if (stream != INVALID_STREAM && stream >= mStreams.Length() && 1.2081 + mState == CLOSED) { 1.2082 + // Update number of streams for init message 1.2083 + struct sctp_initmsg initmsg; 1.2084 + socklen_t len = sizeof(initmsg); 1.2085 + int32_t total_needed = stream+16; 1.2086 + 1.2087 + memset(&initmsg, 0, sizeof(initmsg)); 1.2088 + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { 1.2089 + LOG(("*** failed getsockopt SCTP_INITMSG")); 1.2090 + goto request_error_cleanup; 1.2091 + } 1.2092 + LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed, 1.2093 + initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); 1.2094 + initmsg.sinit_num_ostreams = total_needed; 1.2095 + initmsg.sinit_max_instreams = MAX_NUM_STREAMS; 1.2096 + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, 1.2097 + (socklen_t)sizeof(initmsg)) < 0) { 1.2098 + LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); 1.2099 + goto request_error_cleanup; 1.2100 + } 1.2101 + 1.2102 + int32_t old_len = mStreams.Length(); 1.2103 + mStreams.AppendElements(total_needed - old_len); 1.2104 + for (int32_t i = old_len; i < total_needed; ++i) { 1.2105 + mStreams[i] = nullptr; 1.2106 + } 1.2107 + } 1.2108 + // else if state is CONNECTING, we'll just re-negotiate when OpenFinish 1.2109 + // is called, if needed 1.2110 + queue = true; 1.2111 + } 1.2112 + if (queue) { 1.2113 + LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream)); 1.2114 + // Also serves to mark we told the app 1.2115 + channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; 1.2116 + channel->AddRef(); // we need a ref for the nsDeQue and one to return 1.2117 + mPending.Push(channel); 1.2118 + return channel.forget(); 1.2119 + } 1.2120 + 1.2121 + MOZ_ASSERT(stream != INVALID_STREAM); 1.2122 + // just allocated (& OPEN), or externally negotiated 1.2123 + mStreams[stream] = channel; // holds a reference 1.2124 + channel->mStream = stream; 1.2125 + 1.2126 +#ifdef TEST_QUEUED_DATA 1.2127 + // It's painful to write a test for this... 1.2128 + channel->mState = OPEN; 1.2129 + channel->mReady = true; 1.2130 + SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST); 1.2131 +#endif 1.2132 + 1.2133 + if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) { 1.2134 + // Don't send unordered until this gets cleared 1.2135 + channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK; 1.2136 + } 1.2137 + 1.2138 + if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { 1.2139 + if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol, 1.2140 + stream, 1.2141 + !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), 1.2142 + channel->mPrPolicy, channel->mPrValue)) { 1.2143 + LOG(("SendOpenRequest failed, errno = %d", errno)); 1.2144 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.2145 + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; 1.2146 + StartDefer(); 1.2147 + 1.2148 + return channel.forget(); 1.2149 + } else { 1.2150 + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { 1.2151 + // We already returned the channel to the app. 1.2152 + NS_ERROR("Failed to send open request"); 1.2153 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.2154 + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, 1.2155 + channel)); 1.2156 + } 1.2157 + // If we haven't returned the channel yet, it will get destroyed when we exit 1.2158 + // this function. 1.2159 + mStreams[stream] = nullptr; 1.2160 + channel->mStream = INVALID_STREAM; 1.2161 + // we'll be destroying the channel 1.2162 + channel->mState = CLOSED; 1.2163 + return nullptr; 1.2164 + } 1.2165 + /* NOTREACHED */ 1.2166 + } 1.2167 + } 1.2168 + // Either externally negotiated or we sent Open 1.2169 + channel->mState = OPEN; 1.2170 + channel->mReady = true; 1.2171 + // FIX? Move into DOMDataChannel? I don't think we can send it yet here 1.2172 + LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); 1.2173 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.2174 + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, 1.2175 + channel)); 1.2176 + 1.2177 + return channel.forget(); 1.2178 + 1.2179 +request_error_cleanup: 1.2180 + channel->mState = CLOSED; 1.2181 + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { 1.2182 + // We already returned the channel to the app. 1.2183 + NS_ERROR("Failed to request more streams"); 1.2184 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.2185 + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, 1.2186 + channel)); 1.2187 + return channel.forget(); 1.2188 + } 1.2189 + // we'll be destroying the channel, but it never really got set up 1.2190 + // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and 1.2191 + // Dispatch it to ourselves 1.2192 + return nullptr; 1.2193 +} 1.2194 + 1.2195 +int32_t 1.2196 +DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data, 1.2197 + uint32_t length, uint32_t ppid) 1.2198 +{ 1.2199 + uint16_t flags; 1.2200 + struct sctp_sendv_spa spa; 1.2201 + int32_t result; 1.2202 + 1.2203 + NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0); 1.2204 + NS_WARN_IF_FALSE(length > 0, "Length is 0?!"); 1.2205 + 1.2206 + // To avoid problems where an in-order OPEN is lost and an 1.2207 + // out-of-order data message "beats" it, require data to be in-order 1.2208 + // until we get an ACK. 1.2209 + if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && 1.2210 + !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) { 1.2211 + flags = SCTP_UNORDERED; 1.2212 + } else { 1.2213 + flags = 0; 1.2214 + } 1.2215 + 1.2216 + spa.sendv_sndinfo.snd_ppid = htonl(ppid); 1.2217 + spa.sendv_sndinfo.snd_sid = channel->mStream; 1.2218 + spa.sendv_sndinfo.snd_flags = flags; 1.2219 + spa.sendv_sndinfo.snd_context = 0; 1.2220 + spa.sendv_sndinfo.snd_assoc_id = 0; 1.2221 + spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; 1.2222 + 1.2223 + if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) { 1.2224 + spa.sendv_prinfo.pr_policy = channel->mPrPolicy; 1.2225 + spa.sendv_prinfo.pr_value = channel->mPrValue; 1.2226 + spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; 1.2227 + } 1.2228 + 1.2229 + // Note: Main-thread IO, but doesn't block! 1.2230 + // XXX FIX! to deal with heavy overruns of JS trying to pass data in 1.2231 + // (more than the buffersize) queue data onto another thread to do the 1.2232 + // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp 1.2233 + 1.2234 + // SCTP will return EMSGSIZE if the message is bigger than the buffer 1.2235 + // size (or EAGAIN if there isn't space) 1.2236 + if (channel->mBufferedData.IsEmpty()) { 1.2237 + result = usrsctp_sendv(mSocket, data, length, 1.2238 + nullptr, 0, 1.2239 + (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa), 1.2240 + SCTP_SENDV_SPA, 0); 1.2241 + LOG(("Sent buffer (len=%u), result=%d", length, result)); 1.2242 + } else { 1.2243 + // Fake EAGAIN if we're already buffering data 1.2244 + result = -1; 1.2245 + errno = EAGAIN; 1.2246 + } 1.2247 + if (result < 0) { 1.2248 + if (errno == EAGAIN || errno == EWOULDBLOCK) { 1.2249 + // queue data for resend! And queue any further data for the stream until it is... 1.2250 + BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc 1.2251 + channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array 1.2252 + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA; 1.2253 + LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length)); 1.2254 + StartDefer(); 1.2255 + return 0; 1.2256 + } 1.2257 + LOG(("error %d sending string", errno)); 1.2258 + } 1.2259 + return result; 1.2260 +} 1.2261 + 1.2262 +// Handles fragmenting binary messages 1.2263 +int32_t 1.2264 +DataChannelConnection::SendBinary(DataChannel *channel, const char *data, 1.2265 + uint32_t len, 1.2266 + uint32_t ppid_partial, uint32_t ppid_final) 1.2267 +{ 1.2268 + // Since there's a limit on network buffer size and no limits on message 1.2269 + // size, and we don't want to use EOR mode (multiple writes for a 1.2270 + // message, but all other streams are blocked until you finish sending 1.2271 + // this message), we need to add application-level fragmentation of large 1.2272 + // messages. On a reliable channel, these can be simply rebuilt into a 1.2273 + // large message. On an unreliable channel, we can't and don't know how 1.2274 + // long to wait, and there are no retransmissions, and no easy way to 1.2275 + // tell the user "this part is missing", so on unreliable channels we 1.2276 + // need to return an error if sending more bytes than the network buffers 1.2277 + // can hold, and perhaps a lower number. 1.2278 + 1.2279 + // We *really* don't want to do this from main thread! - and SendMsgInternal 1.2280 + // avoids blocking. 1.2281 + // This MUST be reliable and in-order for the reassembly to work 1.2282 + if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT && 1.2283 + channel->mPrPolicy == DATA_CHANNEL_RELIABLE && 1.2284 + !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) { 1.2285 + int32_t sent=0; 1.2286 + uint32_t origlen = len; 1.2287 + LOG(("Sending binary message length %u in chunks", len)); 1.2288 + // XXX check flags for out-of-order, or force in-order for large binary messages 1.2289 + while (len > 0) { 1.2290 + uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT); 1.2291 + uint32_t ppid; 1.2292 + len -= sendlen; 1.2293 + ppid = len > 0 ? ppid_partial : ppid_final; 1.2294 + LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid)); 1.2295 + // Note that these might end up being deferred and queued. 1.2296 + sent += SendMsgInternal(channel, data, sendlen, ppid); 1.2297 + data += sendlen; 1.2298 + } 1.2299 + LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued", 1.2300 + (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT, 1.2301 + origlen, sent, 1.2302 + channel->mBufferedData.Length())); 1.2303 + return sent; 1.2304 + } 1.2305 + NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT, 1.2306 + "Sending too-large data on unreliable channel!"); 1.2307 + 1.2308 + // This will fail if the message is too large (default 256K) 1.2309 + return SendMsgInternal(channel, data, len, ppid_final); 1.2310 +} 1.2311 + 1.2312 +class ReadBlobRunnable : public nsRunnable { 1.2313 +public: 1.2314 + ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream, 1.2315 + nsIInputStream* aBlob) : 1.2316 + mConnection(aConnection), 1.2317 + mStream(aStream), 1.2318 + mBlob(aBlob) 1.2319 + { } 1.2320 + 1.2321 + NS_IMETHODIMP Run() { 1.2322 + // ReadBlob() is responsible to releasing the reference 1.2323 + DataChannelConnection *self = mConnection; 1.2324 + self->ReadBlob(mConnection.forget(), mStream, mBlob); 1.2325 + return NS_OK; 1.2326 + } 1.2327 + 1.2328 +private: 1.2329 + // Make sure the Connection doesn't die while there are jobs outstanding. 1.2330 + // Let it die (if released by PeerConnectionImpl while we're running) 1.2331 + // when we send our runnable back to MainThread. Then ~DataChannelConnection 1.2332 + // can send the IOThread to MainThread to die in a runnable, avoiding 1.2333 + // unsafe event loop recursion. Evil. 1.2334 + nsRefPtr<DataChannelConnection> mConnection; 1.2335 + uint16_t mStream; 1.2336 + // Use RefCount for preventing the object is deleted when SendBlob returns. 1.2337 + nsRefPtr<nsIInputStream> mBlob; 1.2338 +}; 1.2339 + 1.2340 +int32_t 1.2341 +DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) 1.2342 +{ 1.2343 + DataChannel *channel = mStreams[stream]; 1.2344 + NS_ENSURE_TRUE(channel, 0); 1.2345 + // Spawn a thread to send the data 1.2346 + if (!mInternalIOThread) { 1.2347 + nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread)); 1.2348 + if (NS_FAILED(res)) { 1.2349 + return -1; 1.2350 + } 1.2351 + } 1.2352 + 1.2353 + nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob); 1.2354 + mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL); 1.2355 + return 0; 1.2356 +} 1.2357 + 1.2358 +void 1.2359 +DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis, 1.2360 + uint16_t aStream, nsIInputStream* aBlob) 1.2361 +{ 1.2362 + // NOTE: 'aThis' has been forgotten by the caller to avoid releasing 1.2363 + // it off mainthread; if PeerConnectionImpl has released then we want 1.2364 + // ~DataChannelConnection() to run on MainThread 1.2365 + 1.2366 + // XXX to do this safely, we must enqueue these atomically onto the 1.2367 + // output socket. We need a sender thread(s?) to enque data into the 1.2368 + // socket and to avoid main-thread IO that might block. Even on a 1.2369 + // background thread, we may not want to block on one stream's data. 1.2370 + // I.e. run non-blocking and service multiple channels. 1.2371 + 1.2372 + // For now as a hack, send as a single blast of queued packets which may 1.2373 + // be deferred until buffer space is available. 1.2374 + nsCString temp; 1.2375 + uint64_t len; 1.2376 + nsCOMPtr<nsIThread> mainThread; 1.2377 + NS_GetMainThread(getter_AddRefs(mainThread)); 1.2378 + 1.2379 + if (NS_FAILED(aBlob->Available(&len)) || 1.2380 + NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) { 1.2381 + // Bug 966602: Doesn't return an error to the caller via onerror. 1.2382 + // We must release DataChannelConnection on MainThread to avoid issues (bug 876167) 1.2383 + NS_ProxyRelease(mainThread, aThis.take()); 1.2384 + return; 1.2385 + } 1.2386 + aBlob->Close(); 1.2387 + RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis), 1.2388 + &DataChannelConnection::SendBinaryMsg, 1.2389 + aStream, temp), 1.2390 + NS_DISPATCH_NORMAL); 1.2391 +} 1.2392 + 1.2393 +void 1.2394 +DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList) 1.2395 +{ 1.2396 + ASSERT_WEBRTC(NS_IsMainThread()); 1.2397 + for (uint32_t i = 0; i < mStreams.Length(); ++i) { 1.2398 + if (mStreams[i]) { 1.2399 + aStreamList->push_back(mStreams[i]->mStream); 1.2400 + } 1.2401 + } 1.2402 +} 1.2403 + 1.2404 +int32_t 1.2405 +DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, 1.2406 + bool isBinary) 1.2407 +{ 1.2408 + ASSERT_WEBRTC(NS_IsMainThread()); 1.2409 + // We really could allow this from other threads, so long as we deal with 1.2410 + // asynchronosity issues with channels closing, in particular access to 1.2411 + // mStreams, and issues with the association closing (access to mSocket). 1.2412 + 1.2413 + const char *data = aMsg.BeginReading(); 1.2414 + uint32_t len = aMsg.Length(); 1.2415 + DataChannel *channel; 1.2416 + 1.2417 + LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len)); 1.2418 + // XXX if we want more efficiency, translate flags once at open time 1.2419 + channel = mStreams[stream]; 1.2420 + NS_ENSURE_TRUE(channel, 0); 1.2421 + 1.2422 + if (isBinary) 1.2423 + return SendBinary(channel, data, len, 1.2424 + DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST); 1.2425 + return SendBinary(channel, data, len, 1.2426 + DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST); 1.2427 +} 1.2428 + 1.2429 +void 1.2430 +DataChannelConnection::Close(DataChannel *aChannel) 1.2431 +{ 1.2432 + MutexAutoLock lock(mLock); 1.2433 + CloseInt(aChannel); 1.2434 +} 1.2435 + 1.2436 +// So we can call Close() with the lock already held 1.2437 +// Called from someone who holds a ref via ::Close(), or from ~DataChannel 1.2438 +void 1.2439 +DataChannelConnection::CloseInt(DataChannel *aChannel) 1.2440 +{ 1.2441 + MOZ_ASSERT(aChannel); 1.2442 + nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us 1.2443 + 1.2444 + mLock.AssertCurrentThreadOwns(); 1.2445 + LOG(("Connection %p/Channel %p: Closing stream %u", 1.2446 + channel->mConnection.get(), channel.get(), channel->mStream)); 1.2447 + // re-test since it may have closed before the lock was grabbed 1.2448 + if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) { 1.2449 + LOG(("Channel already closing/closed (%u)", aChannel->mState)); 1.2450 + if (mState == CLOSED && channel->mStream != INVALID_STREAM) { 1.2451 + // called from CloseAll() 1.2452 + // we're not going to hang around waiting any more 1.2453 + mStreams[channel->mStream] = nullptr; 1.2454 + } 1.2455 + return; 1.2456 + } 1.2457 + aChannel->mBufferedData.Clear(); 1.2458 + if (channel->mStream != INVALID_STREAM) { 1.2459 + ResetOutgoingStream(channel->mStream); 1.2460 + if (mState == CLOSED) { // called from CloseAll() 1.2461 + // Let resets accumulate then send all at once in CloseAll() 1.2462 + // we're not going to hang around waiting 1.2463 + mStreams[channel->mStream] = nullptr; 1.2464 + } else { 1.2465 + SendOutgoingStreamReset(); 1.2466 + } 1.2467 + } 1.2468 + aChannel->mState = CLOSING; 1.2469 + if (mState == CLOSED) { 1.2470 + // we're not going to hang around waiting 1.2471 + channel->Destroy(); 1.2472 + } 1.2473 + // At this point when we leave here, the object is a zombie held alive only by the DOM object 1.2474 +} 1.2475 + 1.2476 +void DataChannelConnection::CloseAll() 1.2477 +{ 1.2478 + LOG(("Closing all channels (connection %p)", (void*) this)); 1.2479 + // Don't need to lock here 1.2480 + 1.2481 + // Make sure no more channels will be opened 1.2482 + { 1.2483 + MutexAutoLock lock(mLock); 1.2484 + mState = CLOSED; 1.2485 + } 1.2486 + 1.2487 + // Close current channels 1.2488 + // If there are runnables, they hold a strong ref and keep the channel 1.2489 + // and/or connection alive (even if in a CLOSED state) 1.2490 + bool closed_some = false; 1.2491 + for (uint32_t i = 0; i < mStreams.Length(); ++i) { 1.2492 + if (mStreams[i]) { 1.2493 + mStreams[i]->Close(); 1.2494 + closed_some = true; 1.2495 + } 1.2496 + } 1.2497 + 1.2498 + // Clean up any pending opens for channels 1.2499 + nsRefPtr<DataChannel> channel; 1.2500 + while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) { 1.2501 + LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream)); 1.2502 + channel->Close(); // also releases the ref on each iteration 1.2503 + closed_some = true; 1.2504 + } 1.2505 + // It's more efficient to let the Resets queue in shutdown and then 1.2506 + // SendOutgoingStreamReset() here. 1.2507 + if (closed_some) { 1.2508 + MutexAutoLock lock(mLock); 1.2509 + SendOutgoingStreamReset(); 1.2510 + } 1.2511 +} 1.2512 + 1.2513 +DataChannel::~DataChannel() 1.2514 +{ 1.2515 + // NS_ASSERTION since this is more "I think I caught all the cases that 1.2516 + // can cause this" than a true kill-the-program assertion. If this is 1.2517 + // wrong, nothing bad happens. A worst it's a leak. 1.2518 + NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel"); 1.2519 +} 1.2520 + 1.2521 +void 1.2522 +DataChannel::Close() 1.2523 +{ 1.2524 + ENSURE_DATACONNECTION; 1.2525 + mConnection->Close(this); 1.2526 +} 1.2527 + 1.2528 +// Used when disconnecting from the DataChannelConnection 1.2529 +void 1.2530 +DataChannel::Destroy() 1.2531 +{ 1.2532 + ENSURE_DATACONNECTION; 1.2533 + 1.2534 + LOG(("Destroying Data channel %u", mStream)); 1.2535 + MOZ_ASSERT_IF(mStream != INVALID_STREAM, 1.2536 + !mConnection->FindChannelByStream(mStream)); 1.2537 + mStream = INVALID_STREAM; 1.2538 + mState = CLOSED; 1.2539 + mConnection = nullptr; 1.2540 +} 1.2541 + 1.2542 +void 1.2543 +DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext) 1.2544 +{ 1.2545 + MutexAutoLock mLock(mListenerLock); 1.2546 + mContext = aContext; 1.2547 + mListener = aListener; 1.2548 +} 1.2549 + 1.2550 +// May be called from another (i.e. Main) thread! 1.2551 +void 1.2552 +DataChannel::AppReady() 1.2553 +{ 1.2554 + ENSURE_DATACONNECTION; 1.2555 + 1.2556 + MutexAutoLock lock(mConnection->mLock); 1.2557 + 1.2558 + mReady = true; 1.2559 + if (mState == WAITING_TO_OPEN) { 1.2560 + mState = OPEN; 1.2561 + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( 1.2562 + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection, 1.2563 + this)); 1.2564 + for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) { 1.2565 + nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i]; 1.2566 + MOZ_ASSERT(runnable); 1.2567 + NS_DispatchToMainThread(runnable); 1.2568 + } 1.2569 + } else { 1.2570 + NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN"); 1.2571 + } 1.2572 + mQueuedMessages.Clear(); 1.2573 + mQueuedMessages.Compact(); 1.2574 + // We never use it again... We could even allocate the array in the odd 1.2575 + // cases we need it. 1.2576 +} 1.2577 + 1.2578 +uint32_t 1.2579 +DataChannel::GetBufferedAmount() 1.2580 +{ 1.2581 + uint32_t buffered = 0; 1.2582 + for (uint32_t i = 0; i < mBufferedData.Length(); ++i) { 1.2583 + buffered += mBufferedData[i]->mLength; 1.2584 + } 1.2585 + return buffered; 1.2586 +} 1.2587 + 1.2588 +// Called with mLock locked! 1.2589 +void 1.2590 +DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) 1.2591 +{ 1.2592 + if (!mReady && 1.2593 + (mState == CONNECTING || mState == WAITING_TO_OPEN)) { 1.2594 + mQueuedMessages.AppendElement(aMessage); 1.2595 + } else { 1.2596 + NS_DispatchToMainThread(aMessage); 1.2597 + } 1.2598 +} 1.2599 + 1.2600 +} // namespace mozilla