Thu, 15 Jan 2015 15:55:04 +0100
Back out 97036ab72558 which inappropriately compared turds to third parties.
michael@0 | 1 | /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
michael@0 | 2 | /* vim: set ts=2 et sw=2 tw=80: */ |
michael@0 | 3 | /* This Source Code Form is subject to the terms of the Mozilla Public |
michael@0 | 4 | * License, v. 2.0. If a copy of the MPL was not distributed with this file, |
michael@0 | 5 | * You can obtain one at http://mozilla.org/MPL/2.0/. */ |
michael@0 | 6 | |
michael@0 | 7 | #include <stdio.h> |
michael@0 | 8 | #include <stdlib.h> |
michael@0 | 9 | #if !defined(__Userspace_os_Windows) |
michael@0 | 10 | #include <arpa/inet.h> |
michael@0 | 11 | #endif |
michael@0 | 12 | // usrsctp.h expects to have errno definitions prior to its inclusion. |
michael@0 | 13 | #include <errno.h> |
michael@0 | 14 | |
michael@0 | 15 | #define SCTP_DEBUG 1 |
michael@0 | 16 | #define SCTP_STDINT_INCLUDE <stdint.h> |
michael@0 | 17 | |
michael@0 | 18 | #ifdef _MSC_VER |
michael@0 | 19 | // Disable "warning C4200: nonstandard extension used : zero-sized array in |
michael@0 | 20 | // struct/union" |
michael@0 | 21 | // ...which the third-party file usrsctp.h runs afoul of. |
michael@0 | 22 | #pragma warning(push) |
michael@0 | 23 | #pragma warning(disable:4200) |
michael@0 | 24 | #endif |
michael@0 | 25 | |
michael@0 | 26 | #include "usrsctp.h" |
michael@0 | 27 | |
michael@0 | 28 | #ifdef _MSC_VER |
michael@0 | 29 | #pragma warning(pop) |
michael@0 | 30 | #endif |
michael@0 | 31 | |
michael@0 | 32 | #include "DataChannelLog.h" |
michael@0 | 33 | |
michael@0 | 34 | #include "nsServiceManagerUtils.h" |
michael@0 | 35 | #include "nsIObserverService.h" |
michael@0 | 36 | #include "nsIObserver.h" |
michael@0 | 37 | #include "mozilla/Services.h" |
michael@0 | 38 | #include "nsProxyRelease.h" |
michael@0 | 39 | #include "nsThread.h" |
michael@0 | 40 | #include "nsThreadUtils.h" |
michael@0 | 41 | #include "nsAutoPtr.h" |
michael@0 | 42 | #include "nsNetUtil.h" |
michael@0 | 43 | #include "mozilla/StaticPtr.h" |
michael@0 | 44 | #ifdef MOZ_PEERCONNECTION |
michael@0 | 45 | #include "mtransport/runnable_utils.h" |
michael@0 | 46 | #endif |
michael@0 | 47 | |
michael@0 | 48 | #define DATACHANNEL_LOG(args) LOG(args) |
michael@0 | 49 | #include "DataChannel.h" |
michael@0 | 50 | #include "DataChannelProtocol.h" |
michael@0 | 51 | |
michael@0 | 52 | #ifdef PR_LOGGING |
michael@0 | 53 | PRLogModuleInfo* |
michael@0 | 54 | GetDataChannelLog() |
michael@0 | 55 | { |
michael@0 | 56 | static PRLogModuleInfo* sLog; |
michael@0 | 57 | if (!sLog) |
michael@0 | 58 | sLog = PR_NewLogModule("DataChannel"); |
michael@0 | 59 | return sLog; |
michael@0 | 60 | } |
michael@0 | 61 | |
michael@0 | 62 | PRLogModuleInfo* |
michael@0 | 63 | GetSCTPLog() |
michael@0 | 64 | { |
michael@0 | 65 | static PRLogModuleInfo* sLog; |
michael@0 | 66 | if (!sLog) |
michael@0 | 67 | sLog = PR_NewLogModule("SCTP"); |
michael@0 | 68 | return sLog; |
michael@0 | 69 | } |
michael@0 | 70 | #endif |
michael@0 | 71 | |
michael@0 | 72 | // Let us turn on and off important assertions in non-debug builds |
michael@0 | 73 | #ifdef DEBUG |
michael@0 | 74 | #define ASSERT_WEBRTC(x) MOZ_ASSERT((x)) |
michael@0 | 75 | #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS) |
michael@0 | 76 | #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0) |
michael@0 | 77 | #endif |
michael@0 | 78 | |
michael@0 | 79 | static bool sctp_initialized; |
michael@0 | 80 | |
michael@0 | 81 | namespace mozilla { |
michael@0 | 82 | |
michael@0 | 83 | class DataChannelShutdown; |
michael@0 | 84 | StaticRefPtr<DataChannelShutdown> gDataChannelShutdown; |
michael@0 | 85 | |
michael@0 | 86 | class DataChannelShutdown : public nsIObserver |
michael@0 | 87 | { |
michael@0 | 88 | public: |
michael@0 | 89 | // This needs to be tied to some form object that is guaranteed to be |
michael@0 | 90 | // around (singleton likely) unless we want to shutdown sctp whenever |
michael@0 | 91 | // we're not using it (and in which case we'd keep a refcnt'd object |
michael@0 | 92 | // ref'd by each DataChannelConnection to release the SCTP usrlib via |
michael@0 | 93 | // sctp_finish) |
michael@0 | 94 | |
michael@0 | 95 | NS_DECL_ISUPPORTS |
michael@0 | 96 | |
michael@0 | 97 | DataChannelShutdown() {} |
michael@0 | 98 | |
michael@0 | 99 | void Init() |
michael@0 | 100 | { |
michael@0 | 101 | nsCOMPtr<nsIObserverService> observerService = |
michael@0 | 102 | mozilla::services::GetObserverService(); |
michael@0 | 103 | if (!observerService) |
michael@0 | 104 | return; |
michael@0 | 105 | |
michael@0 | 106 | nsresult rv = observerService->AddObserver(this, |
michael@0 | 107 | "profile-change-net-teardown", |
michael@0 | 108 | false); |
michael@0 | 109 | MOZ_ASSERT(rv == NS_OK); |
michael@0 | 110 | (void) rv; |
michael@0 | 111 | } |
michael@0 | 112 | |
michael@0 | 113 | virtual ~DataChannelShutdown() |
michael@0 | 114 | { |
michael@0 | 115 | nsCOMPtr<nsIObserverService> observerService = |
michael@0 | 116 | mozilla::services::GetObserverService(); |
michael@0 | 117 | if (observerService) |
michael@0 | 118 | observerService->RemoveObserver(this, "profile-change-net-teardown"); |
michael@0 | 119 | } |
michael@0 | 120 | |
michael@0 | 121 | NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic, |
michael@0 | 122 | const char16_t* aData) { |
michael@0 | 123 | if (strcmp(aTopic, "profile-change-net-teardown") == 0) { |
michael@0 | 124 | LOG(("Shutting down SCTP")); |
michael@0 | 125 | if (sctp_initialized) { |
michael@0 | 126 | usrsctp_finish(); |
michael@0 | 127 | sctp_initialized = false; |
michael@0 | 128 | } |
michael@0 | 129 | nsCOMPtr<nsIObserverService> observerService = |
michael@0 | 130 | mozilla::services::GetObserverService(); |
michael@0 | 131 | if (!observerService) |
michael@0 | 132 | return NS_ERROR_FAILURE; |
michael@0 | 133 | |
michael@0 | 134 | nsresult rv = observerService->RemoveObserver(this, |
michael@0 | 135 | "profile-change-net-teardown"); |
michael@0 | 136 | MOZ_ASSERT(rv == NS_OK); |
michael@0 | 137 | (void) rv; |
michael@0 | 138 | |
michael@0 | 139 | nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this); |
michael@0 | 140 | gDataChannelShutdown = nullptr; |
michael@0 | 141 | } |
michael@0 | 142 | return NS_OK; |
michael@0 | 143 | } |
michael@0 | 144 | }; |
michael@0 | 145 | |
michael@0 | 146 | NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver); |
michael@0 | 147 | |
michael@0 | 148 | |
michael@0 | 149 | BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data, |
michael@0 | 150 | uint32_t length) : mLength(length) |
michael@0 | 151 | { |
michael@0 | 152 | mSpa = new sctp_sendv_spa; |
michael@0 | 153 | *mSpa = spa; |
michael@0 | 154 | char *tmp = new char[length]; // infallible malloc! |
michael@0 | 155 | memcpy(tmp, data, length); |
michael@0 | 156 | mData = tmp; |
michael@0 | 157 | } |
michael@0 | 158 | |
michael@0 | 159 | BufferedMsg::~BufferedMsg() |
michael@0 | 160 | { |
michael@0 | 161 | delete mSpa; |
michael@0 | 162 | delete mData; |
michael@0 | 163 | } |
michael@0 | 164 | |
michael@0 | 165 | static int |
michael@0 | 166 | receive_cb(struct socket* sock, union sctp_sockstore addr, |
michael@0 | 167 | void *data, size_t datalen, |
michael@0 | 168 | struct sctp_rcvinfo rcv, int flags, void *ulp_info) |
michael@0 | 169 | { |
michael@0 | 170 | DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info); |
michael@0 | 171 | return connection->ReceiveCallback(sock, data, datalen, rcv, flags); |
michael@0 | 172 | } |
michael@0 | 173 | |
michael@0 | 174 | #ifdef PR_LOGGING |
michael@0 | 175 | static void |
michael@0 | 176 | debug_printf(const char *format, ...) |
michael@0 | 177 | { |
michael@0 | 178 | va_list ap; |
michael@0 | 179 | char buffer[1024]; |
michael@0 | 180 | |
michael@0 | 181 | if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { |
michael@0 | 182 | va_start(ap, format); |
michael@0 | 183 | #ifdef _WIN32 |
michael@0 | 184 | if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { |
michael@0 | 185 | #else |
michael@0 | 186 | if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) { |
michael@0 | 187 | #endif |
michael@0 | 188 | PR_LogPrint("%s", buffer); |
michael@0 | 189 | } |
michael@0 | 190 | va_end(ap); |
michael@0 | 191 | } |
michael@0 | 192 | } |
michael@0 | 193 | #endif |
michael@0 | 194 | |
michael@0 | 195 | DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) : |
michael@0 | 196 | mLock("netwerk::sctp::DataChannelConnection") |
michael@0 | 197 | { |
michael@0 | 198 | mState = CLOSED; |
michael@0 | 199 | mSocket = nullptr; |
michael@0 | 200 | mMasterSocket = nullptr; |
michael@0 | 201 | mListener = listener->asWeakPtr(); |
michael@0 | 202 | mLocalPort = 0; |
michael@0 | 203 | mRemotePort = 0; |
michael@0 | 204 | mDeferTimeout = 10; |
michael@0 | 205 | mTimerRunning = false; |
michael@0 | 206 | LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get())); |
michael@0 | 207 | mInternalIOThread = nullptr; |
michael@0 | 208 | } |
michael@0 | 209 | |
michael@0 | 210 | DataChannelConnection::~DataChannelConnection() |
michael@0 | 211 | { |
michael@0 | 212 | LOG(("Deleting DataChannelConnection %p", (void *) this)); |
michael@0 | 213 | // This may die on the MainThread, or on the STS thread |
michael@0 | 214 | ASSERT_WEBRTC(mState == CLOSED); |
michael@0 | 215 | MOZ_ASSERT(!mMasterSocket); |
michael@0 | 216 | MOZ_ASSERT(mPending.GetSize() == 0); |
michael@0 | 217 | |
michael@0 | 218 | // Already disconnected from sigslot/mTransportFlow |
michael@0 | 219 | // TransportFlows must be released from the STS thread |
michael@0 | 220 | if (!IsSTSThread()) { |
michael@0 | 221 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 222 | if (mTransportFlow) { |
michael@0 | 223 | ASSERT_WEBRTC(mSTS); |
michael@0 | 224 | NS_ProxyRelease(mSTS, mTransportFlow); |
michael@0 | 225 | } |
michael@0 | 226 | |
michael@0 | 227 | if (mInternalIOThread) { |
michael@0 | 228 | // Avoid spinning the event thread from here (which if we're mainthread |
michael@0 | 229 | // is in the event loop already) |
michael@0 | 230 | NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread), |
michael@0 | 231 | &nsIThread::Shutdown), |
michael@0 | 232 | NS_DISPATCH_NORMAL); |
michael@0 | 233 | } |
michael@0 | 234 | } else { |
michael@0 | 235 | // on STS, safe to call shutdown |
michael@0 | 236 | if (mInternalIOThread) { |
michael@0 | 237 | mInternalIOThread->Shutdown(); |
michael@0 | 238 | } |
michael@0 | 239 | } |
michael@0 | 240 | } |
michael@0 | 241 | |
michael@0 | 242 | void |
michael@0 | 243 | DataChannelConnection::Destroy() |
michael@0 | 244 | { |
michael@0 | 245 | // Though it's probably ok to do this and close the sockets; |
michael@0 | 246 | // if we really want it to do true clean shutdowns it can |
michael@0 | 247 | // create a dependant Internal object that would remain around |
michael@0 | 248 | // until the network shut down the association or timed out. |
michael@0 | 249 | LOG(("Destroying DataChannelConnection %p", (void *) this)); |
michael@0 | 250 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 251 | CloseAll(); |
michael@0 | 252 | |
michael@0 | 253 | MutexAutoLock lock(mLock); |
michael@0 | 254 | // If we had a pending reset, we aren't waiting for it - clear the list so |
michael@0 | 255 | // we can deregister this DataChannelConnection without leaking. |
michael@0 | 256 | ClearResets(); |
michael@0 | 257 | |
michael@0 | 258 | MOZ_ASSERT(mSTS); |
michael@0 | 259 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 260 | // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, |
michael@0 | 261 | // the usrsctp_close() calls can move back here (and just proxy the |
michael@0 | 262 | // disconnect_all()) |
michael@0 | 263 | RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this), |
michael@0 | 264 | &DataChannelConnection::DestroyOnSTS, |
michael@0 | 265 | mSocket, mMasterSocket), |
michael@0 | 266 | NS_DISPATCH_NORMAL); |
michael@0 | 267 | |
michael@0 | 268 | // These will be released on STS |
michael@0 | 269 | mSocket = nullptr; |
michael@0 | 270 | mMasterSocket = nullptr; // also a flag that we've Destroyed this connection |
michael@0 | 271 | |
michael@0 | 272 | // Must do this in Destroy() since we may then delete this object |
michael@0 | 273 | if (mUsingDtls) { |
michael@0 | 274 | usrsctp_deregister_address(static_cast<void *>(this)); |
michael@0 | 275 | LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this))); |
michael@0 | 276 | } |
michael@0 | 277 | |
michael@0 | 278 | // We can't get any more new callbacks from the SCTP library |
michael@0 | 279 | // All existing callbacks have refs to DataChannelConnection |
michael@0 | 280 | |
michael@0 | 281 | // nsDOMDataChannel objects have refs to DataChannels that have refs to us |
michael@0 | 282 | } |
michael@0 | 283 | |
michael@0 | 284 | void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket, |
michael@0 | 285 | struct socket *aSocket) |
michael@0 | 286 | { |
michael@0 | 287 | if (aSocket && aSocket != aMasterSocket) |
michael@0 | 288 | usrsctp_close(aSocket); |
michael@0 | 289 | if (aMasterSocket) |
michael@0 | 290 | usrsctp_close(aMasterSocket); |
michael@0 | 291 | |
michael@0 | 292 | disconnect_all(); |
michael@0 | 293 | } |
michael@0 | 294 | |
michael@0 | 295 | NS_IMPL_ISUPPORTS(DataChannelConnection, |
michael@0 | 296 | nsITimerCallback) |
michael@0 | 297 | |
michael@0 | 298 | bool |
michael@0 | 299 | DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls) |
michael@0 | 300 | { |
michael@0 | 301 | struct sctp_initmsg initmsg; |
michael@0 | 302 | struct sctp_udpencaps encaps; |
michael@0 | 303 | struct sctp_assoc_value av; |
michael@0 | 304 | struct sctp_event event; |
michael@0 | 305 | socklen_t len; |
michael@0 | 306 | |
michael@0 | 307 | uint16_t event_types[] = {SCTP_ASSOC_CHANGE, |
michael@0 | 308 | SCTP_PEER_ADDR_CHANGE, |
michael@0 | 309 | SCTP_REMOTE_ERROR, |
michael@0 | 310 | SCTP_SHUTDOWN_EVENT, |
michael@0 | 311 | SCTP_ADAPTATION_INDICATION, |
michael@0 | 312 | SCTP_SEND_FAILED_EVENT, |
michael@0 | 313 | SCTP_STREAM_RESET_EVENT, |
michael@0 | 314 | SCTP_STREAM_CHANGE_EVENT}; |
michael@0 | 315 | { |
michael@0 | 316 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 317 | |
michael@0 | 318 | // MutexAutoLock lock(mLock); Not needed since we're on mainthread always |
michael@0 | 319 | if (!sctp_initialized) { |
michael@0 | 320 | if (aUsingDtls) { |
michael@0 | 321 | LOG(("sctp_init(DTLS)")); |
michael@0 | 322 | #ifdef MOZ_PEERCONNECTION |
michael@0 | 323 | usrsctp_init(0, |
michael@0 | 324 | DataChannelConnection::SctpDtlsOutput, |
michael@0 | 325 | #ifdef PR_LOGGING |
michael@0 | 326 | debug_printf |
michael@0 | 327 | #else |
michael@0 | 328 | nullptr |
michael@0 | 329 | #endif |
michael@0 | 330 | ); |
michael@0 | 331 | #else |
michael@0 | 332 | NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport"); |
michael@0 | 333 | #endif |
michael@0 | 334 | } else { |
michael@0 | 335 | LOG(("sctp_init(%u)", aPort)); |
michael@0 | 336 | usrsctp_init(aPort, |
michael@0 | 337 | nullptr, |
michael@0 | 338 | #ifdef PR_LOGGING |
michael@0 | 339 | debug_printf |
michael@0 | 340 | #else |
michael@0 | 341 | nullptr |
michael@0 | 342 | #endif |
michael@0 | 343 | ); |
michael@0 | 344 | } |
michael@0 | 345 | |
michael@0 | 346 | #ifdef PR_LOGGING |
michael@0 | 347 | // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs |
michael@0 | 348 | if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { |
michael@0 | 349 | usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); |
michael@0 | 350 | } |
michael@0 | 351 | #endif |
michael@0 | 352 | usrsctp_sysctl_set_sctp_blackhole(2); |
michael@0 | 353 | // ECN is currently not supported by the Firefox code |
michael@0 | 354 | usrsctp_sysctl_set_sctp_ecn_enable(0); |
michael@0 | 355 | sctp_initialized = true; |
michael@0 | 356 | |
michael@0 | 357 | gDataChannelShutdown = new DataChannelShutdown(); |
michael@0 | 358 | gDataChannelShutdown->Init(); |
michael@0 | 359 | } |
michael@0 | 360 | } |
michael@0 | 361 | |
michael@0 | 362 | // XXX FIX! make this a global we get once |
michael@0 | 363 | // Find the STS thread |
michael@0 | 364 | nsresult rv; |
michael@0 | 365 | mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); |
michael@0 | 366 | MOZ_ASSERT(NS_SUCCEEDED(rv)); |
michael@0 | 367 | |
michael@0 | 368 | // Open sctp with a callback |
michael@0 | 369 | if ((mMasterSocket = usrsctp_socket( |
michael@0 | 370 | aUsingDtls ? AF_CONN : AF_INET, |
michael@0 | 371 | SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) { |
michael@0 | 372 | return false; |
michael@0 | 373 | } |
michael@0 | 374 | |
michael@0 | 375 | // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking |
michael@0 | 376 | // in associations for normal IO |
michael@0 | 377 | if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) { |
michael@0 | 378 | LOG(("Couldn't set non_blocking on SCTP socket")); |
michael@0 | 379 | // We can't handle connect() safely if it will block, not that this will |
michael@0 | 380 | // even happen. |
michael@0 | 381 | goto error_cleanup; |
michael@0 | 382 | } |
michael@0 | 383 | |
michael@0 | 384 | // Make sure when we close the socket, make sure it doesn't call us back again! |
michael@0 | 385 | // This would cause it try to use an invalid DataChannelConnection pointer |
michael@0 | 386 | struct linger l; |
michael@0 | 387 | l.l_onoff = 1; |
michael@0 | 388 | l.l_linger = 0; |
michael@0 | 389 | if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, |
michael@0 | 390 | (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { |
michael@0 | 391 | LOG(("Couldn't set SO_LINGER on SCTP socket")); |
michael@0 | 392 | // unsafe to allow it to continue if this fails |
michael@0 | 393 | goto error_cleanup; |
michael@0 | 394 | } |
michael@0 | 395 | |
michael@0 | 396 | // XXX Consider disabling this when we add proper SDP negotiation. |
michael@0 | 397 | // We may want to leave enabled for supporting 'cloning' of SDP offers, which |
michael@0 | 398 | // implies re-use of the same pseudo-port number, or forcing a renegotiation. |
michael@0 | 399 | { |
michael@0 | 400 | uint32_t on = 1; |
michael@0 | 401 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, |
michael@0 | 402 | (const void *)&on, (socklen_t)sizeof(on)) < 0) { |
michael@0 | 403 | LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); |
michael@0 | 404 | } |
michael@0 | 405 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY, |
michael@0 | 406 | (const void *)&on, (socklen_t)sizeof(on)) < 0) { |
michael@0 | 407 | LOG(("Couldn't set SCTP_NODELAY on SCTP socket")); |
michael@0 | 408 | } |
michael@0 | 409 | } |
michael@0 | 410 | |
michael@0 | 411 | if (!aUsingDtls) { |
michael@0 | 412 | memset(&encaps, 0, sizeof(encaps)); |
michael@0 | 413 | encaps.sue_address.ss_family = AF_INET; |
michael@0 | 414 | encaps.sue_port = htons(aPort); |
michael@0 | 415 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT, |
michael@0 | 416 | (const void*)&encaps, |
michael@0 | 417 | (socklen_t)sizeof(struct sctp_udpencaps)) < 0) { |
michael@0 | 418 | LOG(("*** failed encaps errno %d", errno)); |
michael@0 | 419 | goto error_cleanup; |
michael@0 | 420 | } |
michael@0 | 421 | LOG(("SCTP encapsulation local port %d", aPort)); |
michael@0 | 422 | } |
michael@0 | 423 | |
michael@0 | 424 | av.assoc_id = SCTP_ALL_ASSOC; |
michael@0 | 425 | av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; |
michael@0 | 426 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, |
michael@0 | 427 | (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { |
michael@0 | 428 | LOG(("*** failed enable stream reset errno %d", errno)); |
michael@0 | 429 | goto error_cleanup; |
michael@0 | 430 | } |
michael@0 | 431 | |
michael@0 | 432 | /* Enable the events of interest. */ |
michael@0 | 433 | memset(&event, 0, sizeof(event)); |
michael@0 | 434 | event.se_assoc_id = SCTP_ALL_ASSOC; |
michael@0 | 435 | event.se_on = 1; |
michael@0 | 436 | for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) { |
michael@0 | 437 | event.se_type = event_types[i]; |
michael@0 | 438 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) { |
michael@0 | 439 | LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno)); |
michael@0 | 440 | goto error_cleanup; |
michael@0 | 441 | } |
michael@0 | 442 | } |
michael@0 | 443 | |
michael@0 | 444 | // Update number of streams |
michael@0 | 445 | mStreams.AppendElements(aNumStreams); |
michael@0 | 446 | for (uint32_t i = 0; i < aNumStreams; ++i) { |
michael@0 | 447 | mStreams[i] = nullptr; |
michael@0 | 448 | } |
michael@0 | 449 | memset(&initmsg, 0, sizeof(initmsg)); |
michael@0 | 450 | len = sizeof(initmsg); |
michael@0 | 451 | if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { |
michael@0 | 452 | LOG(("*** failed getsockopt SCTP_INITMSG")); |
michael@0 | 453 | goto error_cleanup; |
michael@0 | 454 | } |
michael@0 | 455 | LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, |
michael@0 | 456 | initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); |
michael@0 | 457 | initmsg.sinit_num_ostreams = aNumStreams; |
michael@0 | 458 | initmsg.sinit_max_instreams = MAX_NUM_STREAMS; |
michael@0 | 459 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, |
michael@0 | 460 | (socklen_t)sizeof(initmsg)) < 0) { |
michael@0 | 461 | LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); |
michael@0 | 462 | goto error_cleanup; |
michael@0 | 463 | } |
michael@0 | 464 | |
michael@0 | 465 | mSocket = nullptr; |
michael@0 | 466 | if (aUsingDtls) { |
michael@0 | 467 | mUsingDtls = true; |
michael@0 | 468 | usrsctp_register_address(static_cast<void *>(this)); |
michael@0 | 469 | LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this))); |
michael@0 | 470 | } else { |
michael@0 | 471 | mUsingDtls = false; |
michael@0 | 472 | } |
michael@0 | 473 | return true; |
michael@0 | 474 | |
michael@0 | 475 | error_cleanup: |
michael@0 | 476 | usrsctp_close(mMasterSocket); |
michael@0 | 477 | mMasterSocket = nullptr; |
michael@0 | 478 | mUsingDtls = false; |
michael@0 | 479 | return false; |
michael@0 | 480 | } |
michael@0 | 481 | |
michael@0 | 482 | void |
michael@0 | 483 | DataChannelConnection::StartDefer() |
michael@0 | 484 | { |
michael@0 | 485 | nsresult rv; |
michael@0 | 486 | if (!NS_IsMainThread()) { |
michael@0 | 487 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 488 | DataChannelOnMessageAvailable::START_DEFER, |
michael@0 | 489 | this, (DataChannel *) nullptr)); |
michael@0 | 490 | return; |
michael@0 | 491 | } |
michael@0 | 492 | |
michael@0 | 493 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 494 | if (!mDeferredTimer) { |
michael@0 | 495 | mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); |
michael@0 | 496 | MOZ_ASSERT(mDeferredTimer); |
michael@0 | 497 | } |
michael@0 | 498 | |
michael@0 | 499 | if (!mTimerRunning) { |
michael@0 | 500 | rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, |
michael@0 | 501 | nsITimer::TYPE_ONE_SHOT); |
michael@0 | 502 | NS_ENSURE_TRUE_VOID(rv == NS_OK); |
michael@0 | 503 | |
michael@0 | 504 | mTimerRunning = true; |
michael@0 | 505 | } |
michael@0 | 506 | } |
michael@0 | 507 | |
michael@0 | 508 | // nsITimerCallback |
michael@0 | 509 | |
michael@0 | 510 | NS_IMETHODIMP |
michael@0 | 511 | DataChannelConnection::Notify(nsITimer *timer) |
michael@0 | 512 | { |
michael@0 | 513 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 514 | LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout)); |
michael@0 | 515 | |
michael@0 | 516 | if (timer == mDeferredTimer) { |
michael@0 | 517 | if (SendDeferredMessages()) { |
michael@0 | 518 | // Still blocked |
michael@0 | 519 | // we don't need a lock, since this must be main thread... |
michael@0 | 520 | nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, |
michael@0 | 521 | nsITimer::TYPE_ONE_SHOT); |
michael@0 | 522 | if (NS_FAILED(rv)) { |
michael@0 | 523 | LOG(("%s: cannot initialize open timer", __FUNCTION__)); |
michael@0 | 524 | // XXX and do....? |
michael@0 | 525 | return rv; |
michael@0 | 526 | } |
michael@0 | 527 | mTimerRunning = true; |
michael@0 | 528 | } else { |
michael@0 | 529 | LOG(("Turned off deferred send timer")); |
michael@0 | 530 | mTimerRunning = false; |
michael@0 | 531 | } |
michael@0 | 532 | } |
michael@0 | 533 | return NS_OK; |
michael@0 | 534 | } |
michael@0 | 535 | |
michael@0 | 536 | #ifdef MOZ_PEERCONNECTION |
michael@0 | 537 | void |
michael@0 | 538 | DataChannelConnection::SetEvenOdd() |
michael@0 | 539 | { |
michael@0 | 540 | ASSERT_WEBRTC(IsSTSThread()); |
michael@0 | 541 | |
michael@0 | 542 | TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>( |
michael@0 | 543 | mTransportFlow->GetLayer(TransportLayerDtls::ID())); |
michael@0 | 544 | MOZ_ASSERT(dtls); // DTLS is mandatory |
michael@0 | 545 | mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT); |
michael@0 | 546 | } |
michael@0 | 547 | |
michael@0 | 548 | bool |
michael@0 | 549 | DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport) |
michael@0 | 550 | { |
michael@0 | 551 | LOG(("Connect DTLS local %u, remote %u", localport, remoteport)); |
michael@0 | 552 | |
michael@0 | 553 | NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!"); |
michael@0 | 554 | NS_ENSURE_TRUE(aFlow, false); |
michael@0 | 555 | |
michael@0 | 556 | mTransportFlow = aFlow; |
michael@0 | 557 | mLocalPort = localport; |
michael@0 | 558 | mRemotePort = remoteport; |
michael@0 | 559 | mState = CONNECTING; |
michael@0 | 560 | |
michael@0 | 561 | RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this), |
michael@0 | 562 | &DataChannelConnection::SetSignals), |
michael@0 | 563 | NS_DISPATCH_NORMAL); |
michael@0 | 564 | return true; |
michael@0 | 565 | } |
michael@0 | 566 | |
michael@0 | 567 | void |
michael@0 | 568 | DataChannelConnection::SetSignals() |
michael@0 | 569 | { |
michael@0 | 570 | ASSERT_WEBRTC(IsSTSThread()); |
michael@0 | 571 | ASSERT_WEBRTC(mTransportFlow); |
michael@0 | 572 | LOG(("Setting transport signals, state: %d", mTransportFlow->state())); |
michael@0 | 573 | mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput); |
michael@0 | 574 | // SignalStateChange() doesn't call you with the initial state |
michael@0 | 575 | mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect); |
michael@0 | 576 | CompleteConnect(mTransportFlow, mTransportFlow->state()); |
michael@0 | 577 | } |
michael@0 | 578 | |
michael@0 | 579 | void |
michael@0 | 580 | DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state) |
michael@0 | 581 | { |
michael@0 | 582 | LOG(("Data transport state: %d", state)); |
michael@0 | 583 | MutexAutoLock lock(mLock); |
michael@0 | 584 | ASSERT_WEBRTC(IsSTSThread()); |
michael@0 | 585 | // We should abort connection on TS_ERROR. |
michael@0 | 586 | // Note however that the association will also fail (perhaps with a delay) and |
michael@0 | 587 | // notify us in that way |
michael@0 | 588 | if (state != TransportLayer::TS_OPEN || !mMasterSocket) |
michael@0 | 589 | return; |
michael@0 | 590 | |
michael@0 | 591 | struct sockaddr_conn addr; |
michael@0 | 592 | memset(&addr, 0, sizeof(addr)); |
michael@0 | 593 | addr.sconn_family = AF_CONN; |
michael@0 | 594 | #if defined(__Userspace_os_Darwin) |
michael@0 | 595 | addr.sconn_len = sizeof(addr); |
michael@0 | 596 | #endif |
michael@0 | 597 | addr.sconn_port = htons(mLocalPort); |
michael@0 | 598 | addr.sconn_addr = static_cast<void *>(this); |
michael@0 | 599 | |
michael@0 | 600 | LOG(("Calling usrsctp_bind")); |
michael@0 | 601 | int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), |
michael@0 | 602 | sizeof(addr)); |
michael@0 | 603 | if (r < 0) { |
michael@0 | 604 | LOG(("usrsctp_bind failed: %d", r)); |
michael@0 | 605 | } else { |
michael@0 | 606 | // This is the remote addr |
michael@0 | 607 | addr.sconn_port = htons(mRemotePort); |
michael@0 | 608 | LOG(("Calling usrsctp_connect")); |
michael@0 | 609 | r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), |
michael@0 | 610 | sizeof(addr)); |
michael@0 | 611 | if (r < 0) { |
michael@0 | 612 | if (errno == EINPROGRESS) { |
michael@0 | 613 | // non-blocking |
michael@0 | 614 | return; |
michael@0 | 615 | } else { |
michael@0 | 616 | LOG(("usrsctp_connect failed: %d", errno)); |
michael@0 | 617 | mState = CLOSED; |
michael@0 | 618 | } |
michael@0 | 619 | } else { |
michael@0 | 620 | // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that |
michael@0 | 621 | // This also avoids issues with calling TransportFlow stuff on Mainthread |
michael@0 | 622 | return; |
michael@0 | 623 | } |
michael@0 | 624 | } |
michael@0 | 625 | // Note: currently this doesn't actually notify the application |
michael@0 | 626 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 627 | DataChannelOnMessageAvailable::ON_CONNECTION, |
michael@0 | 628 | this, false)); |
michael@0 | 629 | return; |
michael@0 | 630 | } |
michael@0 | 631 | |
michael@0 | 632 | // Process any pending Opens |
michael@0 | 633 | void |
michael@0 | 634 | DataChannelConnection::ProcessQueuedOpens() |
michael@0 | 635 | { |
michael@0 | 636 | // The nsDeque holds channels with an AddRef applied. Another reference |
michael@0 | 637 | // (may) be held by the DOMDataChannel, unless it's been GC'd. No other |
michael@0 | 638 | // references should exist. |
michael@0 | 639 | |
michael@0 | 640 | // Can't copy nsDeque's. Move into temp array since any that fail will |
michael@0 | 641 | // go back to mPending |
michael@0 | 642 | nsDeque temp; |
michael@0 | 643 | DataChannel *temp_channel; // really already_AddRefed<> |
michael@0 | 644 | while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) { |
michael@0 | 645 | temp.Push(static_cast<void *>(temp_channel)); |
michael@0 | 646 | } |
michael@0 | 647 | |
michael@0 | 648 | nsRefPtr<DataChannel> channel; |
michael@0 | 649 | // All these entries have an AddRef(); make that explicit now via the dont_AddRef() |
michael@0 | 650 | while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) { |
michael@0 | 651 | if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { |
michael@0 | 652 | LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream)); |
michael@0 | 653 | channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; |
michael@0 | 654 | // OpenFinish returns a reference itself, so we need to take it can Release it |
michael@0 | 655 | channel = OpenFinish(channel.forget()); // may reset the flag and re-push |
michael@0 | 656 | } else { |
michael@0 | 657 | NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?"); |
michael@0 | 658 | } |
michael@0 | 659 | } |
michael@0 | 660 | |
michael@0 | 661 | } |
michael@0 | 662 | void |
michael@0 | 663 | DataChannelConnection::SctpDtlsInput(TransportFlow *flow, |
michael@0 | 664 | const unsigned char *data, size_t len) |
michael@0 | 665 | { |
michael@0 | 666 | #ifdef PR_LOGGING |
michael@0 | 667 | if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { |
michael@0 | 668 | char *buf; |
michael@0 | 669 | |
michael@0 | 670 | if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) { |
michael@0 | 671 | PR_LogPrint("%s", buf); |
michael@0 | 672 | usrsctp_freedumpbuffer(buf); |
michael@0 | 673 | } |
michael@0 | 674 | } |
michael@0 | 675 | #endif |
michael@0 | 676 | // Pass the data to SCTP |
michael@0 | 677 | usrsctp_conninput(static_cast<void *>(this), data, len, 0); |
michael@0 | 678 | } |
michael@0 | 679 | |
michael@0 | 680 | int |
michael@0 | 681 | DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release) |
michael@0 | 682 | { |
michael@0 | 683 | //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len)); |
michael@0 | 684 | int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0; |
michael@0 | 685 | if (release) |
michael@0 | 686 | delete data; |
michael@0 | 687 | return res; |
michael@0 | 688 | } |
michael@0 | 689 | |
michael@0 | 690 | /* static */ |
michael@0 | 691 | int |
michael@0 | 692 | DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length, |
michael@0 | 693 | uint8_t tos, uint8_t set_df) |
michael@0 | 694 | { |
michael@0 | 695 | DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr); |
michael@0 | 696 | int res; |
michael@0 | 697 | |
michael@0 | 698 | #ifdef PR_LOGGING |
michael@0 | 699 | if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { |
michael@0 | 700 | char *buf; |
michael@0 | 701 | |
michael@0 | 702 | if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) { |
michael@0 | 703 | PR_LogPrint("%s", buf); |
michael@0 | 704 | usrsctp_freedumpbuffer(buf); |
michael@0 | 705 | } |
michael@0 | 706 | } |
michael@0 | 707 | #endif |
michael@0 | 708 | // We're async proxying even if on the STSThread because this is called |
michael@0 | 709 | // with internal SCTP locks held in some cases (such as in usrsctp_connect()). |
michael@0 | 710 | // SCTP has an option for Apple, on IP connections only, to release at least |
michael@0 | 711 | // one of the locks before calling a packet output routine; with changes to |
michael@0 | 712 | // the underlying SCTP stack this might remove the need to use an async proxy. |
michael@0 | 713 | if (0 /*peer->IsSTSThread()*/) { |
michael@0 | 714 | res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false); |
michael@0 | 715 | } else { |
michael@0 | 716 | unsigned char *data = new unsigned char[length]; |
michael@0 | 717 | memcpy(data, buffer, length); |
michael@0 | 718 | res = -1; |
michael@0 | 719 | // XXX It might be worthwhile to add an assertion against the thread |
michael@0 | 720 | // somehow getting into the DataChannel/SCTP code again, as |
michael@0 | 721 | // DISPATCH_SYNC is not fully blocking. This may be tricky, as it |
michael@0 | 722 | // needs to be a per-thread check, not a global. |
michael@0 | 723 | peer->mSTS->Dispatch(WrapRunnable( |
michael@0 | 724 | nsRefPtr<DataChannelConnection>(peer), |
michael@0 | 725 | &DataChannelConnection::SendPacket, data, length, true), |
michael@0 | 726 | NS_DISPATCH_NORMAL); |
michael@0 | 727 | res = 0; // cheat! Packets can always be dropped later anyways |
michael@0 | 728 | } |
michael@0 | 729 | return res; |
michael@0 | 730 | } |
michael@0 | 731 | #endif |
michael@0 | 732 | |
michael@0 | 733 | #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT |
michael@0 | 734 | // listen for incoming associations |
michael@0 | 735 | // Blocks! - Don't call this from main thread! |
michael@0 | 736 | |
michael@0 | 737 | #error This code will not work as-is since SetEvenOdd() runs on Mainthread |
michael@0 | 738 | |
michael@0 | 739 | bool |
michael@0 | 740 | DataChannelConnection::Listen(unsigned short port) |
michael@0 | 741 | { |
michael@0 | 742 | struct sockaddr_in addr; |
michael@0 | 743 | socklen_t addr_len; |
michael@0 | 744 | |
michael@0 | 745 | NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); |
michael@0 | 746 | |
michael@0 | 747 | /* Acting as the 'server' */ |
michael@0 | 748 | memset((void *)&addr, 0, sizeof(addr)); |
michael@0 | 749 | #ifdef HAVE_SIN_LEN |
michael@0 | 750 | addr.sin_len = sizeof(struct sockaddr_in); |
michael@0 | 751 | #endif |
michael@0 | 752 | addr.sin_family = AF_INET; |
michael@0 | 753 | addr.sin_port = htons(port); |
michael@0 | 754 | addr.sin_addr.s_addr = htonl(INADDR_ANY); |
michael@0 | 755 | LOG(("Waiting for connections on port %u", ntohs(addr.sin_port))); |
michael@0 | 756 | mState = CONNECTING; |
michael@0 | 757 | if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) { |
michael@0 | 758 | LOG(("***Failed userspace_bind")); |
michael@0 | 759 | return false; |
michael@0 | 760 | } |
michael@0 | 761 | if (usrsctp_listen(mMasterSocket, 1) < 0) { |
michael@0 | 762 | LOG(("***Failed userspace_listen")); |
michael@0 | 763 | return false; |
michael@0 | 764 | } |
michael@0 | 765 | |
michael@0 | 766 | LOG(("Accepting connection")); |
michael@0 | 767 | addr_len = 0; |
michael@0 | 768 | if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) { |
michael@0 | 769 | LOG(("***Failed accept")); |
michael@0 | 770 | return false; |
michael@0 | 771 | } |
michael@0 | 772 | mState = OPEN; |
michael@0 | 773 | |
michael@0 | 774 | struct linger l; |
michael@0 | 775 | l.l_onoff = 1; |
michael@0 | 776 | l.l_linger = 0; |
michael@0 | 777 | if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, |
michael@0 | 778 | (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { |
michael@0 | 779 | LOG(("Couldn't set SO_LINGER on SCTP socket")); |
michael@0 | 780 | } |
michael@0 | 781 | |
michael@0 | 782 | SetEvenOdd(); |
michael@0 | 783 | |
michael@0 | 784 | // Notify Connection open |
michael@0 | 785 | // XXX We need to make sure connection sticks around until the message is delivered |
michael@0 | 786 | LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); |
michael@0 | 787 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 788 | DataChannelOnMessageAvailable::ON_CONNECTION, |
michael@0 | 789 | this, (DataChannel *) nullptr)); |
michael@0 | 790 | return true; |
michael@0 | 791 | } |
michael@0 | 792 | |
michael@0 | 793 | // Blocks! - Don't call this from main thread! |
michael@0 | 794 | bool |
michael@0 | 795 | DataChannelConnection::Connect(const char *addr, unsigned short port) |
michael@0 | 796 | { |
michael@0 | 797 | struct sockaddr_in addr4; |
michael@0 | 798 | struct sockaddr_in6 addr6; |
michael@0 | 799 | |
michael@0 | 800 | NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); |
michael@0 | 801 | |
michael@0 | 802 | /* Acting as the connector */ |
michael@0 | 803 | LOG(("Connecting to %s, port %u", addr, port)); |
michael@0 | 804 | memset((void *)&addr4, 0, sizeof(struct sockaddr_in)); |
michael@0 | 805 | memset((void *)&addr6, 0, sizeof(struct sockaddr_in6)); |
michael@0 | 806 | #ifdef HAVE_SIN_LEN |
michael@0 | 807 | addr4.sin_len = sizeof(struct sockaddr_in); |
michael@0 | 808 | #endif |
michael@0 | 809 | #ifdef HAVE_SIN6_LEN |
michael@0 | 810 | addr6.sin6_len = sizeof(struct sockaddr_in6); |
michael@0 | 811 | #endif |
michael@0 | 812 | addr4.sin_family = AF_INET; |
michael@0 | 813 | addr6.sin6_family = AF_INET6; |
michael@0 | 814 | addr4.sin_port = htons(port); |
michael@0 | 815 | addr6.sin6_port = htons(port); |
michael@0 | 816 | mState = CONNECTING; |
michael@0 | 817 | |
michael@0 | 818 | #if !defined(__Userspace_os_Windows) |
michael@0 | 819 | if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { |
michael@0 | 820 | if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { |
michael@0 | 821 | LOG(("*** Failed userspace_connect")); |
michael@0 | 822 | return false; |
michael@0 | 823 | } |
michael@0 | 824 | } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { |
michael@0 | 825 | if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { |
michael@0 | 826 | LOG(("*** Failed userspace_connect")); |
michael@0 | 827 | return false; |
michael@0 | 828 | } |
michael@0 | 829 | } else { |
michael@0 | 830 | LOG(("*** Illegal destination address.")); |
michael@0 | 831 | } |
michael@0 | 832 | #else |
michael@0 | 833 | { |
michael@0 | 834 | struct sockaddr_storage ss; |
michael@0 | 835 | int sslen = sizeof(ss); |
michael@0 | 836 | |
michael@0 | 837 | if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) { |
michael@0 | 838 | addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr; |
michael@0 | 839 | if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { |
michael@0 | 840 | LOG(("*** Failed userspace_connect")); |
michael@0 | 841 | return false; |
michael@0 | 842 | } |
michael@0 | 843 | } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) { |
michael@0 | 844 | addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr; |
michael@0 | 845 | if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { |
michael@0 | 846 | LOG(("*** Failed userspace_connect")); |
michael@0 | 847 | return false; |
michael@0 | 848 | } |
michael@0 | 849 | } else { |
michael@0 | 850 | LOG(("*** Illegal destination address.")); |
michael@0 | 851 | } |
michael@0 | 852 | } |
michael@0 | 853 | #endif |
michael@0 | 854 | |
michael@0 | 855 | mSocket = mMasterSocket; |
michael@0 | 856 | |
michael@0 | 857 | LOG(("connect() succeeded! Entering connected mode")); |
michael@0 | 858 | mState = OPEN; |
michael@0 | 859 | |
michael@0 | 860 | SetEvenOdd(); |
michael@0 | 861 | |
michael@0 | 862 | // Notify Connection open |
michael@0 | 863 | // XXX We need to make sure connection sticks around until the message is delivered |
michael@0 | 864 | LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); |
michael@0 | 865 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 866 | DataChannelOnMessageAvailable::ON_CONNECTION, |
michael@0 | 867 | this, (DataChannel *) nullptr)); |
michael@0 | 868 | return true; |
michael@0 | 869 | } |
michael@0 | 870 | #endif |
michael@0 | 871 | |
michael@0 | 872 | DataChannel * |
michael@0 | 873 | DataChannelConnection::FindChannelByStream(uint16_t stream) |
michael@0 | 874 | { |
michael@0 | 875 | return mStreams.SafeElementAt(stream); |
michael@0 | 876 | } |
michael@0 | 877 | |
michael@0 | 878 | uint16_t |
michael@0 | 879 | DataChannelConnection::FindFreeStream() |
michael@0 | 880 | { |
michael@0 | 881 | uint32_t i, j, limit; |
michael@0 | 882 | |
michael@0 | 883 | limit = mStreams.Length(); |
michael@0 | 884 | if (limit > MAX_NUM_STREAMS) |
michael@0 | 885 | limit = MAX_NUM_STREAMS; |
michael@0 | 886 | |
michael@0 | 887 | for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) { |
michael@0 | 888 | if (!mStreams[i]) { |
michael@0 | 889 | // Verify it's not still in the process of closing |
michael@0 | 890 | for (j = 0; j < mStreamsResetting.Length(); ++j) { |
michael@0 | 891 | if (mStreamsResetting[j] == i) { |
michael@0 | 892 | break; |
michael@0 | 893 | } |
michael@0 | 894 | } |
michael@0 | 895 | if (j == mStreamsResetting.Length()) |
michael@0 | 896 | break; |
michael@0 | 897 | } |
michael@0 | 898 | } |
michael@0 | 899 | if (i >= limit) { |
michael@0 | 900 | return INVALID_STREAM; |
michael@0 | 901 | } |
michael@0 | 902 | return i; |
michael@0 | 903 | } |
michael@0 | 904 | |
michael@0 | 905 | bool |
michael@0 | 906 | DataChannelConnection::RequestMoreStreams(int32_t aNeeded) |
michael@0 | 907 | { |
michael@0 | 908 | struct sctp_status status; |
michael@0 | 909 | struct sctp_add_streams sas; |
michael@0 | 910 | uint32_t outStreamsNeeded; |
michael@0 | 911 | socklen_t len; |
michael@0 | 912 | |
michael@0 | 913 | if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) { |
michael@0 | 914 | aNeeded = MAX_NUM_STREAMS - mStreams.Length(); |
michael@0 | 915 | } |
michael@0 | 916 | if (aNeeded <= 0) { |
michael@0 | 917 | return false; |
michael@0 | 918 | } |
michael@0 | 919 | |
michael@0 | 920 | len = (socklen_t)sizeof(struct sctp_status); |
michael@0 | 921 | if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) { |
michael@0 | 922 | LOG(("***failed: getsockopt SCTP_STATUS")); |
michael@0 | 923 | return false; |
michael@0 | 924 | } |
michael@0 | 925 | outStreamsNeeded = aNeeded; // number to add |
michael@0 | 926 | |
michael@0 | 927 | // Note: if multiple channel opens happen when we don't have enough space, |
michael@0 | 928 | // we'll call RequestMoreStreams() multiple times |
michael@0 | 929 | memset(&sas, 0, sizeof(sas)); |
michael@0 | 930 | sas.sas_instrms = 0; |
michael@0 | 931 | sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ |
michael@0 | 932 | // Doesn't block, we get an event when it succeeds or fails |
michael@0 | 933 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, |
michael@0 | 934 | (socklen_t) sizeof(struct sctp_add_streams)) < 0) { |
michael@0 | 935 | if (errno == EALREADY) { |
michael@0 | 936 | LOG(("Already have %u output streams", outStreamsNeeded)); |
michael@0 | 937 | return true; |
michael@0 | 938 | } |
michael@0 | 939 | |
michael@0 | 940 | LOG(("***failed: setsockopt ADD errno=%d", errno)); |
michael@0 | 941 | return false; |
michael@0 | 942 | } |
michael@0 | 943 | LOG(("Requested %u more streams", outStreamsNeeded)); |
michael@0 | 944 | // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the |
michael@0 | 945 | // values are larger than mStreams.Length() |
michael@0 | 946 | return true; |
michael@0 | 947 | } |
michael@0 | 948 | |
michael@0 | 949 | int32_t |
michael@0 | 950 | DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream) |
michael@0 | 951 | { |
michael@0 | 952 | struct sctp_sndinfo sndinfo; |
michael@0 | 953 | |
michael@0 | 954 | // Note: Main-thread IO, but doesn't block |
michael@0 | 955 | memset(&sndinfo, 0, sizeof(struct sctp_sndinfo)); |
michael@0 | 956 | sndinfo.snd_sid = stream; |
michael@0 | 957 | sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); |
michael@0 | 958 | if (usrsctp_sendv(mSocket, msg, len, nullptr, 0, |
michael@0 | 959 | &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), |
michael@0 | 960 | SCTP_SENDV_SNDINFO, 0) < 0) { |
michael@0 | 961 | //LOG(("***failed: sctp_sendv")); don't log because errno is a return! |
michael@0 | 962 | return (0); |
michael@0 | 963 | } |
michael@0 | 964 | return (1); |
michael@0 | 965 | } |
michael@0 | 966 | |
michael@0 | 967 | int32_t |
michael@0 | 968 | DataChannelConnection::SendOpenAckMessage(uint16_t stream) |
michael@0 | 969 | { |
michael@0 | 970 | struct rtcweb_datachannel_ack ack; |
michael@0 | 971 | |
michael@0 | 972 | memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack)); |
michael@0 | 973 | ack.msg_type = DATA_CHANNEL_ACK; |
michael@0 | 974 | |
michael@0 | 975 | return SendControlMessage(&ack, sizeof(ack), stream); |
michael@0 | 976 | } |
michael@0 | 977 | |
michael@0 | 978 | int32_t |
michael@0 | 979 | DataChannelConnection::SendOpenRequestMessage(const nsACString& label, |
michael@0 | 980 | const nsACString& protocol, |
michael@0 | 981 | uint16_t stream, bool unordered, |
michael@0 | 982 | uint16_t prPolicy, uint32_t prValue) |
michael@0 | 983 | { |
michael@0 | 984 | int label_len = label.Length(); // not including nul |
michael@0 | 985 | int proto_len = protocol.Length(); // not including nul |
michael@0 | 986 | struct rtcweb_datachannel_open_request *req = |
michael@0 | 987 | (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len); |
michael@0 | 988 | // careful - request includes 1 char label |
michael@0 | 989 | |
michael@0 | 990 | memset(req, 0, sizeof(struct rtcweb_datachannel_open_request)); |
michael@0 | 991 | req->msg_type = DATA_CHANNEL_OPEN_REQUEST; |
michael@0 | 992 | switch (prPolicy) { |
michael@0 | 993 | case SCTP_PR_SCTP_NONE: |
michael@0 | 994 | req->channel_type = DATA_CHANNEL_RELIABLE; |
michael@0 | 995 | break; |
michael@0 | 996 | case SCTP_PR_SCTP_TTL: |
michael@0 | 997 | req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; |
michael@0 | 998 | break; |
michael@0 | 999 | case SCTP_PR_SCTP_RTX: |
michael@0 | 1000 | req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; |
michael@0 | 1001 | break; |
michael@0 | 1002 | default: |
michael@0 | 1003 | // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno! |
michael@0 | 1004 | moz_free(req); |
michael@0 | 1005 | return (0); |
michael@0 | 1006 | } |
michael@0 | 1007 | if (unordered) { |
michael@0 | 1008 | // Per the current types, all differ by 0x80 between ordered and unordered |
michael@0 | 1009 | req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future |
michael@0 | 1010 | } |
michael@0 | 1011 | |
michael@0 | 1012 | req->reliability_param = htonl(prValue); |
michael@0 | 1013 | req->priority = htons(0); /* XXX: add support */ |
michael@0 | 1014 | req->label_length = htons(label_len); |
michael@0 | 1015 | req->protocol_length = htons(proto_len); |
michael@0 | 1016 | memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); |
michael@0 | 1017 | memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); |
michael@0 | 1018 | |
michael@0 | 1019 | // sizeof(*req) already includes +1 byte for label, need nul for both strings |
michael@0 | 1020 | int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream); |
michael@0 | 1021 | |
michael@0 | 1022 | moz_free(req); |
michael@0 | 1023 | return result; |
michael@0 | 1024 | } |
michael@0 | 1025 | |
michael@0 | 1026 | // XXX This should use a separate thread (outbound queue) which should |
michael@0 | 1027 | // select() to know when to *try* to send data to the socket again. |
michael@0 | 1028 | // Alternatively, it can use a timeout, but that's guaranteed to be wrong |
michael@0 | 1029 | // (just not sure in what direction). We could re-implement NSPR's |
michael@0 | 1030 | // PR_POLL_WRITE/etc handling... with a lot of work. |
michael@0 | 1031 | |
michael@0 | 1032 | // Better yet, use the SCTP stack's notifications on buffer state to avoid |
michael@0 | 1033 | // filling the SCTP's buffers. |
michael@0 | 1034 | |
michael@0 | 1035 | // returns if we're still blocked or not |
michael@0 | 1036 | bool |
michael@0 | 1037 | DataChannelConnection::SendDeferredMessages() |
michael@0 | 1038 | { |
michael@0 | 1039 | uint32_t i; |
michael@0 | 1040 | nsRefPtr<DataChannel> channel; // we may null out the refs to this |
michael@0 | 1041 | bool still_blocked = false; |
michael@0 | 1042 | bool sent = false; |
michael@0 | 1043 | |
michael@0 | 1044 | // This may block while something is modifying channels, but should not block for IO |
michael@0 | 1045 | MutexAutoLock lock(mLock); |
michael@0 | 1046 | |
michael@0 | 1047 | // XXX For total fairness, on a still_blocked we'd start next time at the |
michael@0 | 1048 | // same index. Sorry, not going to bother for now. |
michael@0 | 1049 | for (i = 0; i < mStreams.Length(); ++i) { |
michael@0 | 1050 | channel = mStreams[i]; |
michael@0 | 1051 | if (!channel) |
michael@0 | 1052 | continue; |
michael@0 | 1053 | |
michael@0 | 1054 | // Only one of these should be set.... |
michael@0 | 1055 | if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) { |
michael@0 | 1056 | if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol, |
michael@0 | 1057 | channel->mStream, |
michael@0 | 1058 | channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED, |
michael@0 | 1059 | channel->mPrPolicy, channel->mPrValue)) { |
michael@0 | 1060 | channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ; |
michael@0 | 1061 | |
michael@0 | 1062 | channel->mState = OPEN; |
michael@0 | 1063 | channel->mReady = true; |
michael@0 | 1064 | LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); |
michael@0 | 1065 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1066 | DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, |
michael@0 | 1067 | channel)); |
michael@0 | 1068 | sent = true; |
michael@0 | 1069 | } else { |
michael@0 | 1070 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
michael@0 | 1071 | still_blocked = true; |
michael@0 | 1072 | } else { |
michael@0 | 1073 | // Close the channel, inform the user |
michael@0 | 1074 | mStreams[channel->mStream] = nullptr; |
michael@0 | 1075 | channel->mState = CLOSED; |
michael@0 | 1076 | // Don't need to reset; we didn't open it |
michael@0 | 1077 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1078 | DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
michael@0 | 1079 | channel)); |
michael@0 | 1080 | } |
michael@0 | 1081 | } |
michael@0 | 1082 | } |
michael@0 | 1083 | if (still_blocked) |
michael@0 | 1084 | break; |
michael@0 | 1085 | |
michael@0 | 1086 | if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) { |
michael@0 | 1087 | if (SendOpenAckMessage(channel->mStream)) { |
michael@0 | 1088 | channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK; |
michael@0 | 1089 | sent = true; |
michael@0 | 1090 | } else { |
michael@0 | 1091 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
michael@0 | 1092 | still_blocked = true; |
michael@0 | 1093 | } else { |
michael@0 | 1094 | // Close the channel, inform the user |
michael@0 | 1095 | CloseInt(channel); |
michael@0 | 1096 | // XXX send error via DataChannelOnMessageAvailable (bug 843625) |
michael@0 | 1097 | } |
michael@0 | 1098 | } |
michael@0 | 1099 | } |
michael@0 | 1100 | if (still_blocked) |
michael@0 | 1101 | break; |
michael@0 | 1102 | |
michael@0 | 1103 | if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) { |
michael@0 | 1104 | bool failed_send = false; |
michael@0 | 1105 | int32_t result; |
michael@0 | 1106 | |
michael@0 | 1107 | if (channel->mState == CLOSED || channel->mState == CLOSING) { |
michael@0 | 1108 | channel->mBufferedData.Clear(); |
michael@0 | 1109 | } |
michael@0 | 1110 | while (!channel->mBufferedData.IsEmpty() && |
michael@0 | 1111 | !failed_send) { |
michael@0 | 1112 | struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa; |
michael@0 | 1113 | const char *data = channel->mBufferedData[0]->mData; |
michael@0 | 1114 | uint32_t len = channel->mBufferedData[0]->mLength; |
michael@0 | 1115 | |
michael@0 | 1116 | // SCTP will return EMSGSIZE if the message is bigger than the buffer |
michael@0 | 1117 | // size (or EAGAIN if there isn't space) |
michael@0 | 1118 | if ((result = usrsctp_sendv(mSocket, data, len, |
michael@0 | 1119 | nullptr, 0, |
michael@0 | 1120 | (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa), |
michael@0 | 1121 | SCTP_SENDV_SPA, |
michael@0 | 1122 | 0) < 0)) { |
michael@0 | 1123 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
michael@0 | 1124 | // leave queued for resend |
michael@0 | 1125 | failed_send = true; |
michael@0 | 1126 | LOG(("queue full again when resending %d bytes (%d)", len, result)); |
michael@0 | 1127 | } else { |
michael@0 | 1128 | LOG(("error %d re-sending string", errno)); |
michael@0 | 1129 | failed_send = true; |
michael@0 | 1130 | } |
michael@0 | 1131 | } else { |
michael@0 | 1132 | LOG(("Resent buffer of %d bytes (%d)", len, result)); |
michael@0 | 1133 | sent = true; |
michael@0 | 1134 | channel->mBufferedData.RemoveElementAt(0); |
michael@0 | 1135 | } |
michael@0 | 1136 | } |
michael@0 | 1137 | if (channel->mBufferedData.IsEmpty()) |
michael@0 | 1138 | channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA; |
michael@0 | 1139 | else |
michael@0 | 1140 | still_blocked = true; |
michael@0 | 1141 | } |
michael@0 | 1142 | if (still_blocked) |
michael@0 | 1143 | break; |
michael@0 | 1144 | } |
michael@0 | 1145 | |
michael@0 | 1146 | if (!still_blocked) { |
michael@0 | 1147 | // mDeferTimeout becomes an estimate of how long we need to wait next time we block |
michael@0 | 1148 | return false; |
michael@0 | 1149 | } |
michael@0 | 1150 | // adjust time? More time for next wait if we didn't send anything, less if did |
michael@0 | 1151 | // Pretty crude, but better than nothing; just to keep CPU use down |
michael@0 | 1152 | if (!sent && mDeferTimeout < 50) |
michael@0 | 1153 | mDeferTimeout++; |
michael@0 | 1154 | else if (sent && mDeferTimeout > 10) |
michael@0 | 1155 | mDeferTimeout--; |
michael@0 | 1156 | |
michael@0 | 1157 | return true; |
michael@0 | 1158 | } |
michael@0 | 1159 | |
michael@0 | 1160 | void |
michael@0 | 1161 | DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, |
michael@0 | 1162 | size_t length, |
michael@0 | 1163 | uint16_t stream) |
michael@0 | 1164 | { |
michael@0 | 1165 | nsRefPtr<DataChannel> channel; |
michael@0 | 1166 | uint32_t prValue; |
michael@0 | 1167 | uint16_t prPolicy; |
michael@0 | 1168 | uint32_t flags; |
michael@0 | 1169 | |
michael@0 | 1170 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1171 | |
michael@0 | 1172 | if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) { |
michael@0 | 1173 | LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length, |
michael@0 | 1174 | (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))); |
michael@0 | 1175 | if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) |
michael@0 | 1176 | return; |
michael@0 | 1177 | } |
michael@0 | 1178 | |
michael@0 | 1179 | LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req))); |
michael@0 | 1180 | |
michael@0 | 1181 | switch (req->channel_type) { |
michael@0 | 1182 | case DATA_CHANNEL_RELIABLE: |
michael@0 | 1183 | case DATA_CHANNEL_RELIABLE_UNORDERED: |
michael@0 | 1184 | prPolicy = SCTP_PR_SCTP_NONE; |
michael@0 | 1185 | break; |
michael@0 | 1186 | case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: |
michael@0 | 1187 | case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: |
michael@0 | 1188 | prPolicy = SCTP_PR_SCTP_RTX; |
michael@0 | 1189 | break; |
michael@0 | 1190 | case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: |
michael@0 | 1191 | case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: |
michael@0 | 1192 | prPolicy = SCTP_PR_SCTP_TTL; |
michael@0 | 1193 | break; |
michael@0 | 1194 | default: |
michael@0 | 1195 | LOG(("Unknown channel type", req->channel_type)); |
michael@0 | 1196 | /* XXX error handling */ |
michael@0 | 1197 | return; |
michael@0 | 1198 | } |
michael@0 | 1199 | prValue = ntohl(req->reliability_param); |
michael@0 | 1200 | flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; |
michael@0 | 1201 | |
michael@0 | 1202 | if ((channel = FindChannelByStream(stream))) { |
michael@0 | 1203 | if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { |
michael@0 | 1204 | LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.", |
michael@0 | 1205 | stream, channel->mState)); |
michael@0 | 1206 | /* XXX: some error handling */ |
michael@0 | 1207 | } else { |
michael@0 | 1208 | LOG(("Open for externally negotiated channel %u", stream)); |
michael@0 | 1209 | // XXX should also check protocol, maybe label |
michael@0 | 1210 | if (prPolicy != channel->mPrPolicy || |
michael@0 | 1211 | prValue != channel->mPrValue || |
michael@0 | 1212 | flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) |
michael@0 | 1213 | { |
michael@0 | 1214 | LOG(("WARNING: external negotiation mismatch with OpenRequest:" |
michael@0 | 1215 | "channel %u, policy %u/%u, value %u/%u, flags %x/%x", |
michael@0 | 1216 | stream, prPolicy, channel->mPrPolicy, |
michael@0 | 1217 | prValue, channel->mPrValue, flags, channel->mFlags)); |
michael@0 | 1218 | } |
michael@0 | 1219 | } |
michael@0 | 1220 | return; |
michael@0 | 1221 | } |
michael@0 | 1222 | if (stream >= mStreams.Length()) { |
michael@0 | 1223 | LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length())); |
michael@0 | 1224 | return; |
michael@0 | 1225 | } |
michael@0 | 1226 | |
michael@0 | 1227 | nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length))); |
michael@0 | 1228 | nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)], |
michael@0 | 1229 | ntohs(req->protocol_length))); |
michael@0 | 1230 | |
michael@0 | 1231 | channel = new DataChannel(this, |
michael@0 | 1232 | stream, |
michael@0 | 1233 | DataChannel::CONNECTING, |
michael@0 | 1234 | label, |
michael@0 | 1235 | protocol, |
michael@0 | 1236 | prPolicy, prValue, |
michael@0 | 1237 | flags, |
michael@0 | 1238 | nullptr, nullptr); |
michael@0 | 1239 | mStreams[stream] = channel; |
michael@0 | 1240 | |
michael@0 | 1241 | channel->mState = DataChannel::WAITING_TO_OPEN; |
michael@0 | 1242 | |
michael@0 | 1243 | LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__, |
michael@0 | 1244 | channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState)); |
michael@0 | 1245 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1246 | DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, |
michael@0 | 1247 | this, channel)); |
michael@0 | 1248 | |
michael@0 | 1249 | LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); |
michael@0 | 1250 | |
michael@0 | 1251 | if (!SendOpenAckMessage(stream)) { |
michael@0 | 1252 | // XXX Only on EAGAIN!? And if not, then close the channel?? |
michael@0 | 1253 | channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK; |
michael@0 | 1254 | StartDefer(); |
michael@0 | 1255 | } |
michael@0 | 1256 | |
michael@0 | 1257 | // Now process any queued data messages for the channel (which will |
michael@0 | 1258 | // themselves likely get queued until we leave WAITING_TO_OPEN, plus any |
michael@0 | 1259 | // more that come in before that happens) |
michael@0 | 1260 | DeliverQueuedData(stream); |
michael@0 | 1261 | } |
michael@0 | 1262 | |
michael@0 | 1263 | // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. |
michael@0 | 1264 | // That would make this code moot. Keep it for now for backwards compatibility. |
michael@0 | 1265 | void |
michael@0 | 1266 | DataChannelConnection::DeliverQueuedData(uint16_t stream) |
michael@0 | 1267 | { |
michael@0 | 1268 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1269 | |
michael@0 | 1270 | uint32_t i = 0; |
michael@0 | 1271 | while (i < mQueuedData.Length()) { |
michael@0 | 1272 | // Careful! we may modify the array length from within the loop! |
michael@0 | 1273 | if (mQueuedData[i]->mStream == stream) { |
michael@0 | 1274 | LOG(("Delivering queued data for stream %u, length %u", |
michael@0 | 1275 | stream, mQueuedData[i]->mLength)); |
michael@0 | 1276 | // Deliver the queued data |
michael@0 | 1277 | HandleDataMessage(mQueuedData[i]->mPpid, |
michael@0 | 1278 | mQueuedData[i]->mData, mQueuedData[i]->mLength, |
michael@0 | 1279 | mQueuedData[i]->mStream); |
michael@0 | 1280 | mQueuedData.RemoveElementAt(i); |
michael@0 | 1281 | continue; // don't bump index since we removed the element |
michael@0 | 1282 | } |
michael@0 | 1283 | i++; |
michael@0 | 1284 | } |
michael@0 | 1285 | } |
michael@0 | 1286 | |
michael@0 | 1287 | void |
michael@0 | 1288 | DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, |
michael@0 | 1289 | size_t length, uint16_t stream) |
michael@0 | 1290 | { |
michael@0 | 1291 | DataChannel *channel; |
michael@0 | 1292 | |
michael@0 | 1293 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1294 | |
michael@0 | 1295 | channel = FindChannelByStream(stream); |
michael@0 | 1296 | NS_ENSURE_TRUE_VOID(channel); |
michael@0 | 1297 | |
michael@0 | 1298 | LOG(("OpenAck received for stream %u, waiting=%d", stream, |
michael@0 | 1299 | (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0)); |
michael@0 | 1300 | |
michael@0 | 1301 | channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK; |
michael@0 | 1302 | } |
michael@0 | 1303 | |
michael@0 | 1304 | void |
michael@0 | 1305 | DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream) |
michael@0 | 1306 | { |
michael@0 | 1307 | /* XXX: Send an error message? */ |
michael@0 | 1308 | LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream)); |
michael@0 | 1309 | // XXX Log to JS error console if possible |
michael@0 | 1310 | } |
michael@0 | 1311 | |
michael@0 | 1312 | void |
michael@0 | 1313 | DataChannelConnection::HandleDataMessage(uint32_t ppid, |
michael@0 | 1314 | const void *data, size_t length, |
michael@0 | 1315 | uint16_t stream) |
michael@0 | 1316 | { |
michael@0 | 1317 | DataChannel *channel; |
michael@0 | 1318 | const char *buffer = (const char *) data; |
michael@0 | 1319 | |
michael@0 | 1320 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1321 | |
michael@0 | 1322 | channel = FindChannelByStream(stream); |
michael@0 | 1323 | |
michael@0 | 1324 | // XXX A closed channel may trip this... check |
michael@0 | 1325 | // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. |
michael@0 | 1326 | // That would make this code moot. Keep it for now for backwards compatibility. |
michael@0 | 1327 | if (!channel) { |
michael@0 | 1328 | // In the updated 0-RTT open case, the sender can send data immediately |
michael@0 | 1329 | // after Open, and doesn't set the in-order bit (since we don't have a |
michael@0 | 1330 | // response or ack). Also, with external negotiation, data can come in |
michael@0 | 1331 | // before we're told about the external negotiation. We need to buffer |
michael@0 | 1332 | // data until either a) Open comes in, if the ordering get messed up, |
michael@0 | 1333 | // or b) the app tells us this channel was externally negotiated. When |
michael@0 | 1334 | // these occur, we deliver the data. |
michael@0 | 1335 | |
michael@0 | 1336 | // Since this is rare and non-performance, keep a single list of queued |
michael@0 | 1337 | // data messages to deliver once the channel opens. |
michael@0 | 1338 | LOG(("Queuing data for stream %u, length %u", stream, length)); |
michael@0 | 1339 | // Copies data |
michael@0 | 1340 | mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length)); |
michael@0 | 1341 | return; |
michael@0 | 1342 | } |
michael@0 | 1343 | |
michael@0 | 1344 | // XXX should this be a simple if, no warnings/debugbreaks? |
michael@0 | 1345 | NS_ENSURE_TRUE_VOID(channel->mState != CLOSED); |
michael@0 | 1346 | |
michael@0 | 1347 | { |
michael@0 | 1348 | nsAutoCString recvData(buffer, length); // copies (<64) or allocates |
michael@0 | 1349 | bool is_binary = true; |
michael@0 | 1350 | |
michael@0 | 1351 | if (ppid == DATA_CHANNEL_PPID_DOMSTRING || |
michael@0 | 1352 | ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) { |
michael@0 | 1353 | is_binary = false; |
michael@0 | 1354 | } |
michael@0 | 1355 | if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) { |
michael@0 | 1356 | NS_WARNING("DataChannel message aborted by fragment type change!"); |
michael@0 | 1357 | channel->mRecvBuffer.Truncate(0); |
michael@0 | 1358 | } |
michael@0 | 1359 | channel->mIsRecvBinary = is_binary; |
michael@0 | 1360 | |
michael@0 | 1361 | switch (ppid) { |
michael@0 | 1362 | case DATA_CHANNEL_PPID_DOMSTRING: |
michael@0 | 1363 | case DATA_CHANNEL_PPID_BINARY: |
michael@0 | 1364 | channel->mRecvBuffer += recvData; |
michael@0 | 1365 | LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u", |
michael@0 | 1366 | is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(), |
michael@0 | 1367 | channel->mStream)); |
michael@0 | 1368 | return; // Not ready to notify application |
michael@0 | 1369 | |
michael@0 | 1370 | case DATA_CHANNEL_PPID_DOMSTRING_LAST: |
michael@0 | 1371 | LOG(("DataChannel: String message received of length %lu on channel %u", |
michael@0 | 1372 | length, channel->mStream)); |
michael@0 | 1373 | if (!channel->mRecvBuffer.IsEmpty()) { |
michael@0 | 1374 | channel->mRecvBuffer += recvData; |
michael@0 | 1375 | LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel)); |
michael@0 | 1376 | channel->SendOrQueue(new DataChannelOnMessageAvailable( |
michael@0 | 1377 | DataChannelOnMessageAvailable::ON_DATA, this, |
michael@0 | 1378 | channel, channel->mRecvBuffer, -1)); |
michael@0 | 1379 | channel->mRecvBuffer.Truncate(0); |
michael@0 | 1380 | return; |
michael@0 | 1381 | } |
michael@0 | 1382 | // else send using recvData normally |
michael@0 | 1383 | length = -1; // Flag for DOMString |
michael@0 | 1384 | |
michael@0 | 1385 | // WebSockets checks IsUTF8() here; we can try to deliver it |
michael@0 | 1386 | break; |
michael@0 | 1387 | |
michael@0 | 1388 | case DATA_CHANNEL_PPID_BINARY_LAST: |
michael@0 | 1389 | LOG(("DataChannel: Received binary message of length %lu on channel id %u", |
michael@0 | 1390 | length, channel->mStream)); |
michael@0 | 1391 | if (!channel->mRecvBuffer.IsEmpty()) { |
michael@0 | 1392 | channel->mRecvBuffer += recvData; |
michael@0 | 1393 | LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); |
michael@0 | 1394 | channel->SendOrQueue(new DataChannelOnMessageAvailable( |
michael@0 | 1395 | DataChannelOnMessageAvailable::ON_DATA, this, |
michael@0 | 1396 | channel, channel->mRecvBuffer, |
michael@0 | 1397 | channel->mRecvBuffer.Length())); |
michael@0 | 1398 | channel->mRecvBuffer.Truncate(0); |
michael@0 | 1399 | return; |
michael@0 | 1400 | } |
michael@0 | 1401 | // else send using recvData normally |
michael@0 | 1402 | break; |
michael@0 | 1403 | |
michael@0 | 1404 | default: |
michael@0 | 1405 | NS_ERROR("Unknown data PPID"); |
michael@0 | 1406 | return; |
michael@0 | 1407 | } |
michael@0 | 1408 | /* Notify onmessage */ |
michael@0 | 1409 | LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); |
michael@0 | 1410 | channel->SendOrQueue(new DataChannelOnMessageAvailable( |
michael@0 | 1411 | DataChannelOnMessageAvailable::ON_DATA, this, |
michael@0 | 1412 | channel, recvData, length)); |
michael@0 | 1413 | } |
michael@0 | 1414 | } |
michael@0 | 1415 | |
michael@0 | 1416 | // Called with mLock locked! |
michael@0 | 1417 | void |
michael@0 | 1418 | DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream) |
michael@0 | 1419 | { |
michael@0 | 1420 | const struct rtcweb_datachannel_open_request *req; |
michael@0 | 1421 | const struct rtcweb_datachannel_ack *ack; |
michael@0 | 1422 | |
michael@0 | 1423 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1424 | |
michael@0 | 1425 | switch (ppid) { |
michael@0 | 1426 | case DATA_CHANNEL_PPID_CONTROL: |
michael@0 | 1427 | req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer); |
michael@0 | 1428 | |
michael@0 | 1429 | NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message |
michael@0 | 1430 | switch (req->msg_type) { |
michael@0 | 1431 | case DATA_CHANNEL_OPEN_REQUEST: |
michael@0 | 1432 | // structure includes a possibly-unused char label[1] (in a packed structure) |
michael@0 | 1433 | NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1); |
michael@0 | 1434 | |
michael@0 | 1435 | HandleOpenRequestMessage(req, length, stream); |
michael@0 | 1436 | break; |
michael@0 | 1437 | case DATA_CHANNEL_ACK: |
michael@0 | 1438 | // >= sizeof(*ack) checked above |
michael@0 | 1439 | |
michael@0 | 1440 | ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer); |
michael@0 | 1441 | HandleOpenAckMessage(ack, length, stream); |
michael@0 | 1442 | break; |
michael@0 | 1443 | default: |
michael@0 | 1444 | HandleUnknownMessage(ppid, length, stream); |
michael@0 | 1445 | break; |
michael@0 | 1446 | } |
michael@0 | 1447 | break; |
michael@0 | 1448 | case DATA_CHANNEL_PPID_DOMSTRING: |
michael@0 | 1449 | case DATA_CHANNEL_PPID_DOMSTRING_LAST: |
michael@0 | 1450 | case DATA_CHANNEL_PPID_BINARY: |
michael@0 | 1451 | case DATA_CHANNEL_PPID_BINARY_LAST: |
michael@0 | 1452 | HandleDataMessage(ppid, buffer, length, stream); |
michael@0 | 1453 | break; |
michael@0 | 1454 | default: |
michael@0 | 1455 | LOG(("Message of length %lu, PPID %u on stream %u received.", |
michael@0 | 1456 | length, ppid, stream)); |
michael@0 | 1457 | break; |
michael@0 | 1458 | } |
michael@0 | 1459 | } |
michael@0 | 1460 | |
michael@0 | 1461 | void |
michael@0 | 1462 | DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac) |
michael@0 | 1463 | { |
michael@0 | 1464 | uint32_t i, n; |
michael@0 | 1465 | |
michael@0 | 1466 | switch (sac->sac_state) { |
michael@0 | 1467 | case SCTP_COMM_UP: |
michael@0 | 1468 | LOG(("Association change: SCTP_COMM_UP")); |
michael@0 | 1469 | if (mState == CONNECTING) { |
michael@0 | 1470 | mSocket = mMasterSocket; |
michael@0 | 1471 | mState = OPEN; |
michael@0 | 1472 | |
michael@0 | 1473 | SetEvenOdd(); |
michael@0 | 1474 | |
michael@0 | 1475 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1476 | DataChannelOnMessageAvailable::ON_CONNECTION, |
michael@0 | 1477 | this, true)); |
michael@0 | 1478 | LOG(("DTLS connect() succeeded! Entering connected mode")); |
michael@0 | 1479 | |
michael@0 | 1480 | // Open any streams pending... |
michael@0 | 1481 | ProcessQueuedOpens(); |
michael@0 | 1482 | |
michael@0 | 1483 | } else if (mState == OPEN) { |
michael@0 | 1484 | LOG(("DataConnection Already OPEN")); |
michael@0 | 1485 | } else { |
michael@0 | 1486 | LOG(("Unexpected state: %d", mState)); |
michael@0 | 1487 | } |
michael@0 | 1488 | break; |
michael@0 | 1489 | case SCTP_COMM_LOST: |
michael@0 | 1490 | LOG(("Association change: SCTP_COMM_LOST")); |
michael@0 | 1491 | // This association is toast, so also close all the channels -- from mainthread! |
michael@0 | 1492 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1493 | DataChannelOnMessageAvailable::ON_DISCONNECTED, |
michael@0 | 1494 | this)); |
michael@0 | 1495 | break; |
michael@0 | 1496 | case SCTP_RESTART: |
michael@0 | 1497 | LOG(("Association change: SCTP_RESTART")); |
michael@0 | 1498 | break; |
michael@0 | 1499 | case SCTP_SHUTDOWN_COMP: |
michael@0 | 1500 | LOG(("Association change: SCTP_SHUTDOWN_COMP")); |
michael@0 | 1501 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1502 | DataChannelOnMessageAvailable::ON_DISCONNECTED, |
michael@0 | 1503 | this)); |
michael@0 | 1504 | break; |
michael@0 | 1505 | case SCTP_CANT_STR_ASSOC: |
michael@0 | 1506 | LOG(("Association change: SCTP_CANT_STR_ASSOC")); |
michael@0 | 1507 | break; |
michael@0 | 1508 | default: |
michael@0 | 1509 | LOG(("Association change: UNKNOWN")); |
michael@0 | 1510 | break; |
michael@0 | 1511 | } |
michael@0 | 1512 | LOG(("Association change: streams (in/out) = (%u/%u)", |
michael@0 | 1513 | sac->sac_inbound_streams, sac->sac_outbound_streams)); |
michael@0 | 1514 | |
michael@0 | 1515 | NS_ENSURE_TRUE_VOID(sac); |
michael@0 | 1516 | n = sac->sac_length - sizeof(*sac); |
michael@0 | 1517 | if (((sac->sac_state == SCTP_COMM_UP) || |
michael@0 | 1518 | (sac->sac_state == SCTP_RESTART)) && (n > 0)) { |
michael@0 | 1519 | for (i = 0; i < n; ++i) { |
michael@0 | 1520 | switch (sac->sac_info[i]) { |
michael@0 | 1521 | case SCTP_ASSOC_SUPPORTS_PR: |
michael@0 | 1522 | LOG(("Supports: PR")); |
michael@0 | 1523 | break; |
michael@0 | 1524 | case SCTP_ASSOC_SUPPORTS_AUTH: |
michael@0 | 1525 | LOG(("Supports: AUTH")); |
michael@0 | 1526 | break; |
michael@0 | 1527 | case SCTP_ASSOC_SUPPORTS_ASCONF: |
michael@0 | 1528 | LOG(("Supports: ASCONF")); |
michael@0 | 1529 | break; |
michael@0 | 1530 | case SCTP_ASSOC_SUPPORTS_MULTIBUF: |
michael@0 | 1531 | LOG(("Supports: MULTIBUF")); |
michael@0 | 1532 | break; |
michael@0 | 1533 | case SCTP_ASSOC_SUPPORTS_RE_CONFIG: |
michael@0 | 1534 | LOG(("Supports: RE-CONFIG")); |
michael@0 | 1535 | break; |
michael@0 | 1536 | default: |
michael@0 | 1537 | LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); |
michael@0 | 1538 | break; |
michael@0 | 1539 | } |
michael@0 | 1540 | } |
michael@0 | 1541 | } else if (((sac->sac_state == SCTP_COMM_LOST) || |
michael@0 | 1542 | (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { |
michael@0 | 1543 | LOG(("Association: ABORT =")); |
michael@0 | 1544 | for (i = 0; i < n; ++i) { |
michael@0 | 1545 | LOG((" 0x%02x", sac->sac_info[i])); |
michael@0 | 1546 | } |
michael@0 | 1547 | } |
michael@0 | 1548 | if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || |
michael@0 | 1549 | (sac->sac_state == SCTP_SHUTDOWN_COMP) || |
michael@0 | 1550 | (sac->sac_state == SCTP_COMM_LOST)) { |
michael@0 | 1551 | return; |
michael@0 | 1552 | } |
michael@0 | 1553 | } |
michael@0 | 1554 | |
michael@0 | 1555 | void |
michael@0 | 1556 | DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc) |
michael@0 | 1557 | { |
michael@0 | 1558 | char addr_buf[INET6_ADDRSTRLEN]; |
michael@0 | 1559 | const char *addr = ""; |
michael@0 | 1560 | struct sockaddr_in *sin; |
michael@0 | 1561 | struct sockaddr_in6 *sin6; |
michael@0 | 1562 | #if defined(__Userspace_os_Windows) |
michael@0 | 1563 | DWORD addr_len = INET6_ADDRSTRLEN; |
michael@0 | 1564 | #endif |
michael@0 | 1565 | |
michael@0 | 1566 | switch (spc->spc_aaddr.ss_family) { |
michael@0 | 1567 | case AF_INET: |
michael@0 | 1568 | sin = (struct sockaddr_in *)&spc->spc_aaddr; |
michael@0 | 1569 | #if !defined(__Userspace_os_Windows) |
michael@0 | 1570 | addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); |
michael@0 | 1571 | #else |
michael@0 | 1572 | if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr, |
michael@0 | 1573 | addr_buf, &addr_len)) { |
michael@0 | 1574 | return; |
michael@0 | 1575 | } |
michael@0 | 1576 | #endif |
michael@0 | 1577 | break; |
michael@0 | 1578 | case AF_INET6: |
michael@0 | 1579 | sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr; |
michael@0 | 1580 | #if !defined(__Userspace_os_Windows) |
michael@0 | 1581 | addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); |
michael@0 | 1582 | #else |
michael@0 | 1583 | if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr, |
michael@0 | 1584 | addr_buf, &addr_len)) { |
michael@0 | 1585 | return; |
michael@0 | 1586 | } |
michael@0 | 1587 | #endif |
michael@0 | 1588 | case AF_CONN: |
michael@0 | 1589 | addr = "DTLS connection"; |
michael@0 | 1590 | break; |
michael@0 | 1591 | default: |
michael@0 | 1592 | break; |
michael@0 | 1593 | } |
michael@0 | 1594 | LOG(("Peer address %s is now ", addr)); |
michael@0 | 1595 | switch (spc->spc_state) { |
michael@0 | 1596 | case SCTP_ADDR_AVAILABLE: |
michael@0 | 1597 | LOG(("SCTP_ADDR_AVAILABLE")); |
michael@0 | 1598 | break; |
michael@0 | 1599 | case SCTP_ADDR_UNREACHABLE: |
michael@0 | 1600 | LOG(("SCTP_ADDR_UNREACHABLE")); |
michael@0 | 1601 | break; |
michael@0 | 1602 | case SCTP_ADDR_REMOVED: |
michael@0 | 1603 | LOG(("SCTP_ADDR_REMOVED")); |
michael@0 | 1604 | break; |
michael@0 | 1605 | case SCTP_ADDR_ADDED: |
michael@0 | 1606 | LOG(("SCTP_ADDR_ADDED")); |
michael@0 | 1607 | break; |
michael@0 | 1608 | case SCTP_ADDR_MADE_PRIM: |
michael@0 | 1609 | LOG(("SCTP_ADDR_MADE_PRIM")); |
michael@0 | 1610 | break; |
michael@0 | 1611 | case SCTP_ADDR_CONFIRMED: |
michael@0 | 1612 | LOG(("SCTP_ADDR_CONFIRMED")); |
michael@0 | 1613 | break; |
michael@0 | 1614 | default: |
michael@0 | 1615 | LOG(("UNKNOWN")); |
michael@0 | 1616 | break; |
michael@0 | 1617 | } |
michael@0 | 1618 | LOG((" (error = 0x%08x).\n", spc->spc_error)); |
michael@0 | 1619 | } |
michael@0 | 1620 | |
michael@0 | 1621 | void |
michael@0 | 1622 | DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre) |
michael@0 | 1623 | { |
michael@0 | 1624 | size_t i, n; |
michael@0 | 1625 | |
michael@0 | 1626 | n = sre->sre_length - sizeof(struct sctp_remote_error); |
michael@0 | 1627 | LOG(("Remote Error (error = 0x%04x): ", sre->sre_error)); |
michael@0 | 1628 | for (i = 0; i < n; ++i) { |
michael@0 | 1629 | LOG((" 0x%02x", sre-> sre_data[i])); |
michael@0 | 1630 | } |
michael@0 | 1631 | } |
michael@0 | 1632 | |
michael@0 | 1633 | void |
michael@0 | 1634 | DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse) |
michael@0 | 1635 | { |
michael@0 | 1636 | LOG(("Shutdown event.")); |
michael@0 | 1637 | /* XXX: notify all channels. */ |
michael@0 | 1638 | // Attempts to actually send anything will fail |
michael@0 | 1639 | } |
michael@0 | 1640 | |
michael@0 | 1641 | void |
michael@0 | 1642 | DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai) |
michael@0 | 1643 | { |
michael@0 | 1644 | LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind)); |
michael@0 | 1645 | } |
michael@0 | 1646 | |
michael@0 | 1647 | void |
michael@0 | 1648 | DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe) |
michael@0 | 1649 | { |
michael@0 | 1650 | size_t i, n; |
michael@0 | 1651 | |
michael@0 | 1652 | if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { |
michael@0 | 1653 | LOG(("Unsent ")); |
michael@0 | 1654 | } |
michael@0 | 1655 | if (ssfe->ssfe_flags & SCTP_DATA_SENT) { |
michael@0 | 1656 | LOG(("Sent ")); |
michael@0 | 1657 | } |
michael@0 | 1658 | if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { |
michael@0 | 1659 | LOG(("(flags = %x) ", ssfe->ssfe_flags)); |
michael@0 | 1660 | } |
michael@0 | 1661 | LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x", |
michael@0 | 1662 | ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, |
michael@0 | 1663 | ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); |
michael@0 | 1664 | n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); |
michael@0 | 1665 | for (i = 0; i < n; ++i) { |
michael@0 | 1666 | LOG((" 0x%02x", ssfe->ssfe_data[i])); |
michael@0 | 1667 | } |
michael@0 | 1668 | } |
michael@0 | 1669 | |
michael@0 | 1670 | void |
michael@0 | 1671 | DataChannelConnection::ClearResets() |
michael@0 | 1672 | { |
michael@0 | 1673 | // Clear all pending resets |
michael@0 | 1674 | if (!mStreamsResetting.IsEmpty()) { |
michael@0 | 1675 | LOG(("Clearing resets for %d streams", mStreamsResetting.Length())); |
michael@0 | 1676 | } |
michael@0 | 1677 | |
michael@0 | 1678 | for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) { |
michael@0 | 1679 | nsRefPtr<DataChannel> channel; |
michael@0 | 1680 | channel = FindChannelByStream(mStreamsResetting[i]); |
michael@0 | 1681 | if (channel) { |
michael@0 | 1682 | LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get())); |
michael@0 | 1683 | mStreams[channel->mStream] = nullptr; |
michael@0 | 1684 | } |
michael@0 | 1685 | } |
michael@0 | 1686 | mStreamsResetting.Clear(); |
michael@0 | 1687 | } |
michael@0 | 1688 | |
michael@0 | 1689 | void |
michael@0 | 1690 | DataChannelConnection::ResetOutgoingStream(uint16_t stream) |
michael@0 | 1691 | { |
michael@0 | 1692 | uint32_t i; |
michael@0 | 1693 | |
michael@0 | 1694 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1695 | LOG(("Connection %p: Resetting outgoing stream %u", |
michael@0 | 1696 | (void *) this, stream)); |
michael@0 | 1697 | // Rarely has more than a couple items and only for a short time |
michael@0 | 1698 | for (i = 0; i < mStreamsResetting.Length(); ++i) { |
michael@0 | 1699 | if (mStreamsResetting[i] == stream) { |
michael@0 | 1700 | return; |
michael@0 | 1701 | } |
michael@0 | 1702 | } |
michael@0 | 1703 | mStreamsResetting.AppendElement(stream); |
michael@0 | 1704 | } |
michael@0 | 1705 | |
michael@0 | 1706 | void |
michael@0 | 1707 | DataChannelConnection::SendOutgoingStreamReset() |
michael@0 | 1708 | { |
michael@0 | 1709 | struct sctp_reset_streams *srs; |
michael@0 | 1710 | uint32_t i; |
michael@0 | 1711 | size_t len; |
michael@0 | 1712 | |
michael@0 | 1713 | LOG(("Connection %p: Sending outgoing stream reset for %d streams", |
michael@0 | 1714 | (void *) this, mStreamsResetting.Length())); |
michael@0 | 1715 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1716 | if (mStreamsResetting.IsEmpty()) { |
michael@0 | 1717 | LOG(("No streams to reset")); |
michael@0 | 1718 | return; |
michael@0 | 1719 | } |
michael@0 | 1720 | len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); |
michael@0 | 1721 | srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc |
michael@0 | 1722 | memset(srs, 0, len); |
michael@0 | 1723 | srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; |
michael@0 | 1724 | srs->srs_number_streams = mStreamsResetting.Length(); |
michael@0 | 1725 | for (i = 0; i < mStreamsResetting.Length(); ++i) { |
michael@0 | 1726 | srs->srs_stream_list[i] = mStreamsResetting[i]; |
michael@0 | 1727 | } |
michael@0 | 1728 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) { |
michael@0 | 1729 | LOG(("***failed: setsockopt RESET, errno %d", errno)); |
michael@0 | 1730 | // if errno == EALREADY, this is normal - we can't send another reset |
michael@0 | 1731 | // with one pending. |
michael@0 | 1732 | // When we get an incoming reset (which may be a response to our |
michael@0 | 1733 | // outstanding one), see if we have any pending outgoing resets and |
michael@0 | 1734 | // send them |
michael@0 | 1735 | } else { |
michael@0 | 1736 | mStreamsResetting.Clear(); |
michael@0 | 1737 | } |
michael@0 | 1738 | moz_free(srs); |
michael@0 | 1739 | } |
michael@0 | 1740 | |
michael@0 | 1741 | void |
michael@0 | 1742 | DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) |
michael@0 | 1743 | { |
michael@0 | 1744 | uint32_t n, i; |
michael@0 | 1745 | nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel |
michael@0 | 1746 | |
michael@0 | 1747 | if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && |
michael@0 | 1748 | !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { |
michael@0 | 1749 | n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); |
michael@0 | 1750 | for (i = 0; i < n; ++i) { |
michael@0 | 1751 | if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { |
michael@0 | 1752 | channel = FindChannelByStream(strrst->strreset_stream_list[i]); |
michael@0 | 1753 | if (channel) { |
michael@0 | 1754 | // The other side closed the channel |
michael@0 | 1755 | // We could be in three states: |
michael@0 | 1756 | // 1. Normal state (input and output streams (OPEN) |
michael@0 | 1757 | // Notify application, send a RESET in response on our |
michael@0 | 1758 | // outbound channel. Go to CLOSED |
michael@0 | 1759 | // 2. We sent our own reset (CLOSING); either they crossed on the |
michael@0 | 1760 | // wire, or this is a response to our Reset. |
michael@0 | 1761 | // Go to CLOSED |
michael@0 | 1762 | // 3. We've sent a open but haven't gotten a response yet (CONNECTING) |
michael@0 | 1763 | // I believe this is impossible, as we don't have an input stream yet. |
michael@0 | 1764 | |
michael@0 | 1765 | LOG(("Incoming: Channel %u closed, state %d", |
michael@0 | 1766 | channel->mStream, channel->mState)); |
michael@0 | 1767 | ASSERT_WEBRTC(channel->mState == DataChannel::OPEN || |
michael@0 | 1768 | channel->mState == DataChannel::CLOSING || |
michael@0 | 1769 | channel->mState == DataChannel::CONNECTING || |
michael@0 | 1770 | channel->mState == DataChannel::WAITING_TO_OPEN); |
michael@0 | 1771 | if (channel->mState == DataChannel::OPEN || |
michael@0 | 1772 | channel->mState == DataChannel::WAITING_TO_OPEN) { |
michael@0 | 1773 | ResetOutgoingStream(channel->mStream); |
michael@0 | 1774 | SendOutgoingStreamReset(); |
michael@0 | 1775 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1776 | DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
michael@0 | 1777 | channel)); |
michael@0 | 1778 | } |
michael@0 | 1779 | mStreams[channel->mStream] = nullptr; |
michael@0 | 1780 | |
michael@0 | 1781 | LOG(("Disconnected DataChannel %p from connection %p", |
michael@0 | 1782 | (void *) channel.get(), (void *) channel->mConnection.get())); |
michael@0 | 1783 | channel->Destroy(); |
michael@0 | 1784 | // At this point when we leave here, the object is a zombie held alive only by the DOM object |
michael@0 | 1785 | } else { |
michael@0 | 1786 | LOG(("Can't find incoming channel %d",i)); |
michael@0 | 1787 | } |
michael@0 | 1788 | } |
michael@0 | 1789 | } |
michael@0 | 1790 | } |
michael@0 | 1791 | |
michael@0 | 1792 | // In case we failed to send a RESET due to having one outstanding, process any pending resets now: |
michael@0 | 1793 | if (!mStreamsResetting.IsEmpty()) { |
michael@0 | 1794 | LOG(("Sending %d pending resets", mStreamsResetting.Length())); |
michael@0 | 1795 | SendOutgoingStreamReset(); |
michael@0 | 1796 | } |
michael@0 | 1797 | } |
michael@0 | 1798 | |
michael@0 | 1799 | void |
michael@0 | 1800 | DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg) |
michael@0 | 1801 | { |
michael@0 | 1802 | uint16_t stream; |
michael@0 | 1803 | uint32_t i; |
michael@0 | 1804 | nsRefPtr<DataChannel> channel; |
michael@0 | 1805 | |
michael@0 | 1806 | if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { |
michael@0 | 1807 | LOG(("*** Failed increasing number of streams from %u (%u/%u)", |
michael@0 | 1808 | mStreams.Length(), |
michael@0 | 1809 | strchg->strchange_instrms, |
michael@0 | 1810 | strchg->strchange_outstrms)); |
michael@0 | 1811 | // XXX FIX! notify pending opens of failure |
michael@0 | 1812 | return; |
michael@0 | 1813 | } else { |
michael@0 | 1814 | if (strchg->strchange_instrms > mStreams.Length()) { |
michael@0 | 1815 | LOG(("Other side increased streams from %u to %u", |
michael@0 | 1816 | mStreams.Length(), strchg->strchange_instrms)); |
michael@0 | 1817 | } |
michael@0 | 1818 | if (strchg->strchange_outstrms > mStreams.Length() || |
michael@0 | 1819 | strchg->strchange_instrms > mStreams.Length()) { |
michael@0 | 1820 | uint16_t old_len = mStreams.Length(); |
michael@0 | 1821 | uint16_t new_len = std::max(strchg->strchange_outstrms, |
michael@0 | 1822 | strchg->strchange_instrms); |
michael@0 | 1823 | LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)", |
michael@0 | 1824 | old_len, new_len, new_len - old_len, |
michael@0 | 1825 | strchg->strchange_instrms)); |
michael@0 | 1826 | // make sure both are the same length |
michael@0 | 1827 | mStreams.AppendElements(new_len - old_len); |
michael@0 | 1828 | LOG(("New length = %d (was %d)", mStreams.Length(), old_len)); |
michael@0 | 1829 | for (size_t i = old_len; i < mStreams.Length(); ++i) { |
michael@0 | 1830 | mStreams[i] = nullptr; |
michael@0 | 1831 | } |
michael@0 | 1832 | // Re-process any channels waiting for streams. |
michael@0 | 1833 | // Linear search, but we don't increase channels often and |
michael@0 | 1834 | // the array would only get long in case of an app error normally |
michael@0 | 1835 | |
michael@0 | 1836 | // Make sure we request enough streams if there's a big jump in streams |
michael@0 | 1837 | // Could make a more complex API for OpenXxxFinish() and avoid this loop |
michael@0 | 1838 | int32_t num_needed = mPending.GetSize(); |
michael@0 | 1839 | LOG(("%d of %d new streams already needed", num_needed, |
michael@0 | 1840 | new_len - old_len)); |
michael@0 | 1841 | num_needed -= (new_len - old_len); // number we added |
michael@0 | 1842 | if (num_needed > 0) { |
michael@0 | 1843 | if (num_needed < 16) |
michael@0 | 1844 | num_needed = 16; |
michael@0 | 1845 | LOG(("Not enough new streams, asking for %d more", num_needed)); |
michael@0 | 1846 | RequestMoreStreams(num_needed); |
michael@0 | 1847 | } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { |
michael@0 | 1848 | LOG(("Requesting %d output streams to match partner", |
michael@0 | 1849 | strchg->strchange_instrms - strchg->strchange_outstrms)); |
michael@0 | 1850 | RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms); |
michael@0 | 1851 | } |
michael@0 | 1852 | |
michael@0 | 1853 | ProcessQueuedOpens(); |
michael@0 | 1854 | } |
michael@0 | 1855 | // else probably not a change in # of streams |
michael@0 | 1856 | } |
michael@0 | 1857 | |
michael@0 | 1858 | for (i = 0; i < mStreams.Length(); ++i) { |
michael@0 | 1859 | channel = mStreams[i]; |
michael@0 | 1860 | if (!channel) |
michael@0 | 1861 | continue; |
michael@0 | 1862 | |
michael@0 | 1863 | if ((channel->mState == CONNECTING) && |
michael@0 | 1864 | (channel->mStream == INVALID_STREAM)) { |
michael@0 | 1865 | if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || |
michael@0 | 1866 | (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { |
michael@0 | 1867 | /* XXX: Signal to the other end. */ |
michael@0 | 1868 | channel->mState = CLOSED; |
michael@0 | 1869 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 1870 | DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
michael@0 | 1871 | channel)); |
michael@0 | 1872 | // maybe fire onError (bug 843625) |
michael@0 | 1873 | } else { |
michael@0 | 1874 | stream = FindFreeStream(); |
michael@0 | 1875 | if (stream != INVALID_STREAM) { |
michael@0 | 1876 | channel->mStream = stream; |
michael@0 | 1877 | mStreams[stream] = channel; |
michael@0 | 1878 | channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; |
michael@0 | 1879 | /// XXX fix |
michael@0 | 1880 | StartDefer(); |
michael@0 | 1881 | } else { |
michael@0 | 1882 | /* We will not find more ... */ |
michael@0 | 1883 | break; |
michael@0 | 1884 | } |
michael@0 | 1885 | } |
michael@0 | 1886 | } |
michael@0 | 1887 | } |
michael@0 | 1888 | } |
michael@0 | 1889 | |
michael@0 | 1890 | |
michael@0 | 1891 | // Called with mLock locked! |
michael@0 | 1892 | void |
michael@0 | 1893 | DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n) |
michael@0 | 1894 | { |
michael@0 | 1895 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 1896 | if (notif->sn_header.sn_length != (uint32_t)n) { |
michael@0 | 1897 | return; |
michael@0 | 1898 | } |
michael@0 | 1899 | switch (notif->sn_header.sn_type) { |
michael@0 | 1900 | case SCTP_ASSOC_CHANGE: |
michael@0 | 1901 | HandleAssociationChangeEvent(&(notif->sn_assoc_change)); |
michael@0 | 1902 | break; |
michael@0 | 1903 | case SCTP_PEER_ADDR_CHANGE: |
michael@0 | 1904 | HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); |
michael@0 | 1905 | break; |
michael@0 | 1906 | case SCTP_REMOTE_ERROR: |
michael@0 | 1907 | HandleRemoteErrorEvent(&(notif->sn_remote_error)); |
michael@0 | 1908 | break; |
michael@0 | 1909 | case SCTP_SHUTDOWN_EVENT: |
michael@0 | 1910 | HandleShutdownEvent(&(notif->sn_shutdown_event)); |
michael@0 | 1911 | break; |
michael@0 | 1912 | case SCTP_ADAPTATION_INDICATION: |
michael@0 | 1913 | HandleAdaptationIndication(&(notif->sn_adaptation_event)); |
michael@0 | 1914 | break; |
michael@0 | 1915 | case SCTP_PARTIAL_DELIVERY_EVENT: |
michael@0 | 1916 | LOG(("SCTP_PARTIAL_DELIVERY_EVENT")); |
michael@0 | 1917 | break; |
michael@0 | 1918 | case SCTP_AUTHENTICATION_EVENT: |
michael@0 | 1919 | LOG(("SCTP_AUTHENTICATION_EVENT")); |
michael@0 | 1920 | break; |
michael@0 | 1921 | case SCTP_SENDER_DRY_EVENT: |
michael@0 | 1922 | //LOG(("SCTP_SENDER_DRY_EVENT")); |
michael@0 | 1923 | break; |
michael@0 | 1924 | case SCTP_NOTIFICATIONS_STOPPED_EVENT: |
michael@0 | 1925 | LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); |
michael@0 | 1926 | break; |
michael@0 | 1927 | case SCTP_SEND_FAILED_EVENT: |
michael@0 | 1928 | HandleSendFailedEvent(&(notif->sn_send_failed_event)); |
michael@0 | 1929 | break; |
michael@0 | 1930 | case SCTP_STREAM_RESET_EVENT: |
michael@0 | 1931 | HandleStreamResetEvent(&(notif->sn_strreset_event)); |
michael@0 | 1932 | break; |
michael@0 | 1933 | case SCTP_ASSOC_RESET_EVENT: |
michael@0 | 1934 | LOG(("SCTP_ASSOC_RESET_EVENT")); |
michael@0 | 1935 | break; |
michael@0 | 1936 | case SCTP_STREAM_CHANGE_EVENT: |
michael@0 | 1937 | HandleStreamChangeEvent(&(notif->sn_strchange_event)); |
michael@0 | 1938 | break; |
michael@0 | 1939 | default: |
michael@0 | 1940 | LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); |
michael@0 | 1941 | break; |
michael@0 | 1942 | } |
michael@0 | 1943 | } |
michael@0 | 1944 | |
michael@0 | 1945 | int |
michael@0 | 1946 | DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen, |
michael@0 | 1947 | struct sctp_rcvinfo rcv, int32_t flags) |
michael@0 | 1948 | { |
michael@0 | 1949 | ASSERT_WEBRTC(!NS_IsMainThread()); |
michael@0 | 1950 | |
michael@0 | 1951 | if (!data) { |
michael@0 | 1952 | usrsctp_close(sock); // SCTP has finished shutting down |
michael@0 | 1953 | } else { |
michael@0 | 1954 | MutexAutoLock lock(mLock); |
michael@0 | 1955 | if (flags & MSG_NOTIFICATION) { |
michael@0 | 1956 | HandleNotification(static_cast<union sctp_notification *>(data), datalen); |
michael@0 | 1957 | } else { |
michael@0 | 1958 | HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid); |
michael@0 | 1959 | } |
michael@0 | 1960 | } |
michael@0 | 1961 | // sctp allocates 'data' with malloc(), and expects the receiver to free |
michael@0 | 1962 | // it (presumably with free). |
michael@0 | 1963 | // XXX future optimization: try to deliver messages without an internal |
michael@0 | 1964 | // alloc/copy, and if so delay the free until later. |
michael@0 | 1965 | free(data); |
michael@0 | 1966 | // usrsctp defines the callback as returning an int, but doesn't use it |
michael@0 | 1967 | return 1; |
michael@0 | 1968 | } |
michael@0 | 1969 | |
michael@0 | 1970 | already_AddRefed<DataChannel> |
michael@0 | 1971 | DataChannelConnection::Open(const nsACString& label, const nsACString& protocol, |
michael@0 | 1972 | Type type, bool inOrder, |
michael@0 | 1973 | uint32_t prValue, DataChannelListener *aListener, |
michael@0 | 1974 | nsISupports *aContext, bool aExternalNegotiated, |
michael@0 | 1975 | uint16_t aStream) |
michael@0 | 1976 | { |
michael@0 | 1977 | // aStream == INVALID_STREAM to have the protocol allocate |
michael@0 | 1978 | uint16_t prPolicy = SCTP_PR_SCTP_NONE; |
michael@0 | 1979 | uint32_t flags; |
michael@0 | 1980 | |
michael@0 | 1981 | LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u", |
michael@0 | 1982 | PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(), |
michael@0 | 1983 | type, inOrder, prValue, aListener, aContext, |
michael@0 | 1984 | aExternalNegotiated ? "true" : "false", aStream)); |
michael@0 | 1985 | switch (type) { |
michael@0 | 1986 | case DATA_CHANNEL_RELIABLE: |
michael@0 | 1987 | prPolicy = SCTP_PR_SCTP_NONE; |
michael@0 | 1988 | break; |
michael@0 | 1989 | case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: |
michael@0 | 1990 | prPolicy = SCTP_PR_SCTP_RTX; |
michael@0 | 1991 | break; |
michael@0 | 1992 | case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: |
michael@0 | 1993 | prPolicy = SCTP_PR_SCTP_TTL; |
michael@0 | 1994 | break; |
michael@0 | 1995 | } |
michael@0 | 1996 | if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) { |
michael@0 | 1997 | return nullptr; |
michael@0 | 1998 | } |
michael@0 | 1999 | |
michael@0 | 2000 | // Don't look past currently-negotiated streams |
michael@0 | 2001 | if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) { |
michael@0 | 2002 | LOG(("ERROR: external negotiation of already-open channel %u", aStream)); |
michael@0 | 2003 | // XXX How do we indicate this up to the application? Probably the |
michael@0 | 2004 | // caller's job, but we may need to return an error code. |
michael@0 | 2005 | return nullptr; |
michael@0 | 2006 | } |
michael@0 | 2007 | |
michael@0 | 2008 | flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; |
michael@0 | 2009 | nsRefPtr<DataChannel> channel(new DataChannel(this, |
michael@0 | 2010 | aStream, |
michael@0 | 2011 | DataChannel::CONNECTING, |
michael@0 | 2012 | label, protocol, |
michael@0 | 2013 | type, prValue, |
michael@0 | 2014 | flags, |
michael@0 | 2015 | aListener, aContext)); |
michael@0 | 2016 | if (aExternalNegotiated) { |
michael@0 | 2017 | channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED; |
michael@0 | 2018 | } |
michael@0 | 2019 | |
michael@0 | 2020 | MutexAutoLock lock(mLock); // OpenFinish assumes this |
michael@0 | 2021 | return OpenFinish(channel.forget()); |
michael@0 | 2022 | } |
michael@0 | 2023 | |
michael@0 | 2024 | // Separate routine so we can also call it to finish up from pending opens |
michael@0 | 2025 | already_AddRefed<DataChannel> |
michael@0 | 2026 | DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel) |
michael@0 | 2027 | { |
michael@0 | 2028 | nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in |
michael@0 | 2029 | // Normally 1 reference if called from ::Open(), or 2 if called from |
michael@0 | 2030 | // ProcessQueuedOpens() unless the DOMDataChannel was gc'd |
michael@0 | 2031 | uint16_t stream = channel->mStream; |
michael@0 | 2032 | bool queue = false; |
michael@0 | 2033 | |
michael@0 | 2034 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 2035 | |
michael@0 | 2036 | // Cases we care about: |
michael@0 | 2037 | // Pre-negotiated: |
michael@0 | 2038 | // Not Open: |
michael@0 | 2039 | // Doesn't fit: |
michael@0 | 2040 | // -> change initial ask or renegotiate after open |
michael@0 | 2041 | // -> queue open |
michael@0 | 2042 | // Open: |
michael@0 | 2043 | // Doesn't fit: |
michael@0 | 2044 | // -> RequestMoreStreams && queue |
michael@0 | 2045 | // Does fit: |
michael@0 | 2046 | // -> open |
michael@0 | 2047 | // Not negotiated: |
michael@0 | 2048 | // Not Open: |
michael@0 | 2049 | // -> queue open |
michael@0 | 2050 | // Open: |
michael@0 | 2051 | // -> Try to get a stream |
michael@0 | 2052 | // Doesn't fit: |
michael@0 | 2053 | // -> RequestMoreStreams && queue |
michael@0 | 2054 | // Does fit: |
michael@0 | 2055 | // -> open |
michael@0 | 2056 | // So the Open cases are basically the same |
michael@0 | 2057 | // Not Open cases are simply queue for non-negotiated, and |
michael@0 | 2058 | // either change the initial ask or possibly renegotiate after open. |
michael@0 | 2059 | |
michael@0 | 2060 | if (mState == OPEN) { |
michael@0 | 2061 | if (stream == INVALID_STREAM) { |
michael@0 | 2062 | stream = FindFreeStream(); // may be INVALID_STREAM if we need more |
michael@0 | 2063 | } |
michael@0 | 2064 | if (stream == INVALID_STREAM || stream >= mStreams.Length()) { |
michael@0 | 2065 | // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams |
michael@0 | 2066 | // to avoid going back immediately for more if the ask to N, N+1, etc |
michael@0 | 2067 | int32_t more_needed = (stream == INVALID_STREAM) ? 16 : |
michael@0 | 2068 | (stream-((int32_t)mStreams.Length())) + 16; |
michael@0 | 2069 | if (!RequestMoreStreams(more_needed)) { |
michael@0 | 2070 | // Something bad happened... we're done |
michael@0 | 2071 | goto request_error_cleanup; |
michael@0 | 2072 | } |
michael@0 | 2073 | queue = true; |
michael@0 | 2074 | } |
michael@0 | 2075 | } else { |
michael@0 | 2076 | // not OPEN |
michael@0 | 2077 | if (stream != INVALID_STREAM && stream >= mStreams.Length() && |
michael@0 | 2078 | mState == CLOSED) { |
michael@0 | 2079 | // Update number of streams for init message |
michael@0 | 2080 | struct sctp_initmsg initmsg; |
michael@0 | 2081 | socklen_t len = sizeof(initmsg); |
michael@0 | 2082 | int32_t total_needed = stream+16; |
michael@0 | 2083 | |
michael@0 | 2084 | memset(&initmsg, 0, sizeof(initmsg)); |
michael@0 | 2085 | if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { |
michael@0 | 2086 | LOG(("*** failed getsockopt SCTP_INITMSG")); |
michael@0 | 2087 | goto request_error_cleanup; |
michael@0 | 2088 | } |
michael@0 | 2089 | LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed, |
michael@0 | 2090 | initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); |
michael@0 | 2091 | initmsg.sinit_num_ostreams = total_needed; |
michael@0 | 2092 | initmsg.sinit_max_instreams = MAX_NUM_STREAMS; |
michael@0 | 2093 | if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, |
michael@0 | 2094 | (socklen_t)sizeof(initmsg)) < 0) { |
michael@0 | 2095 | LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); |
michael@0 | 2096 | goto request_error_cleanup; |
michael@0 | 2097 | } |
michael@0 | 2098 | |
michael@0 | 2099 | int32_t old_len = mStreams.Length(); |
michael@0 | 2100 | mStreams.AppendElements(total_needed - old_len); |
michael@0 | 2101 | for (int32_t i = old_len; i < total_needed; ++i) { |
michael@0 | 2102 | mStreams[i] = nullptr; |
michael@0 | 2103 | } |
michael@0 | 2104 | } |
michael@0 | 2105 | // else if state is CONNECTING, we'll just re-negotiate when OpenFinish |
michael@0 | 2106 | // is called, if needed |
michael@0 | 2107 | queue = true; |
michael@0 | 2108 | } |
michael@0 | 2109 | if (queue) { |
michael@0 | 2110 | LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream)); |
michael@0 | 2111 | // Also serves to mark we told the app |
michael@0 | 2112 | channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; |
michael@0 | 2113 | channel->AddRef(); // we need a ref for the nsDeQue and one to return |
michael@0 | 2114 | mPending.Push(channel); |
michael@0 | 2115 | return channel.forget(); |
michael@0 | 2116 | } |
michael@0 | 2117 | |
michael@0 | 2118 | MOZ_ASSERT(stream != INVALID_STREAM); |
michael@0 | 2119 | // just allocated (& OPEN), or externally negotiated |
michael@0 | 2120 | mStreams[stream] = channel; // holds a reference |
michael@0 | 2121 | channel->mStream = stream; |
michael@0 | 2122 | |
michael@0 | 2123 | #ifdef TEST_QUEUED_DATA |
michael@0 | 2124 | // It's painful to write a test for this... |
michael@0 | 2125 | channel->mState = OPEN; |
michael@0 | 2126 | channel->mReady = true; |
michael@0 | 2127 | SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST); |
michael@0 | 2128 | #endif |
michael@0 | 2129 | |
michael@0 | 2130 | if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) { |
michael@0 | 2131 | // Don't send unordered until this gets cleared |
michael@0 | 2132 | channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK; |
michael@0 | 2133 | } |
michael@0 | 2134 | |
michael@0 | 2135 | if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { |
michael@0 | 2136 | if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol, |
michael@0 | 2137 | stream, |
michael@0 | 2138 | !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), |
michael@0 | 2139 | channel->mPrPolicy, channel->mPrValue)) { |
michael@0 | 2140 | LOG(("SendOpenRequest failed, errno = %d", errno)); |
michael@0 | 2141 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
michael@0 | 2142 | channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; |
michael@0 | 2143 | StartDefer(); |
michael@0 | 2144 | |
michael@0 | 2145 | return channel.forget(); |
michael@0 | 2146 | } else { |
michael@0 | 2147 | if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { |
michael@0 | 2148 | // We already returned the channel to the app. |
michael@0 | 2149 | NS_ERROR("Failed to send open request"); |
michael@0 | 2150 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 2151 | DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
michael@0 | 2152 | channel)); |
michael@0 | 2153 | } |
michael@0 | 2154 | // If we haven't returned the channel yet, it will get destroyed when we exit |
michael@0 | 2155 | // this function. |
michael@0 | 2156 | mStreams[stream] = nullptr; |
michael@0 | 2157 | channel->mStream = INVALID_STREAM; |
michael@0 | 2158 | // we'll be destroying the channel |
michael@0 | 2159 | channel->mState = CLOSED; |
michael@0 | 2160 | return nullptr; |
michael@0 | 2161 | } |
michael@0 | 2162 | /* NOTREACHED */ |
michael@0 | 2163 | } |
michael@0 | 2164 | } |
michael@0 | 2165 | // Either externally negotiated or we sent Open |
michael@0 | 2166 | channel->mState = OPEN; |
michael@0 | 2167 | channel->mReady = true; |
michael@0 | 2168 | // FIX? Move into DOMDataChannel? I don't think we can send it yet here |
michael@0 | 2169 | LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); |
michael@0 | 2170 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 2171 | DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, |
michael@0 | 2172 | channel)); |
michael@0 | 2173 | |
michael@0 | 2174 | return channel.forget(); |
michael@0 | 2175 | |
michael@0 | 2176 | request_error_cleanup: |
michael@0 | 2177 | channel->mState = CLOSED; |
michael@0 | 2178 | if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { |
michael@0 | 2179 | // We already returned the channel to the app. |
michael@0 | 2180 | NS_ERROR("Failed to request more streams"); |
michael@0 | 2181 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 2182 | DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
michael@0 | 2183 | channel)); |
michael@0 | 2184 | return channel.forget(); |
michael@0 | 2185 | } |
michael@0 | 2186 | // we'll be destroying the channel, but it never really got set up |
michael@0 | 2187 | // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and |
michael@0 | 2188 | // Dispatch it to ourselves |
michael@0 | 2189 | return nullptr; |
michael@0 | 2190 | } |
michael@0 | 2191 | |
michael@0 | 2192 | int32_t |
michael@0 | 2193 | DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data, |
michael@0 | 2194 | uint32_t length, uint32_t ppid) |
michael@0 | 2195 | { |
michael@0 | 2196 | uint16_t flags; |
michael@0 | 2197 | struct sctp_sendv_spa spa; |
michael@0 | 2198 | int32_t result; |
michael@0 | 2199 | |
michael@0 | 2200 | NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0); |
michael@0 | 2201 | NS_WARN_IF_FALSE(length > 0, "Length is 0?!"); |
michael@0 | 2202 | |
michael@0 | 2203 | // To avoid problems where an in-order OPEN is lost and an |
michael@0 | 2204 | // out-of-order data message "beats" it, require data to be in-order |
michael@0 | 2205 | // until we get an ACK. |
michael@0 | 2206 | if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && |
michael@0 | 2207 | !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) { |
michael@0 | 2208 | flags = SCTP_UNORDERED; |
michael@0 | 2209 | } else { |
michael@0 | 2210 | flags = 0; |
michael@0 | 2211 | } |
michael@0 | 2212 | |
michael@0 | 2213 | spa.sendv_sndinfo.snd_ppid = htonl(ppid); |
michael@0 | 2214 | spa.sendv_sndinfo.snd_sid = channel->mStream; |
michael@0 | 2215 | spa.sendv_sndinfo.snd_flags = flags; |
michael@0 | 2216 | spa.sendv_sndinfo.snd_context = 0; |
michael@0 | 2217 | spa.sendv_sndinfo.snd_assoc_id = 0; |
michael@0 | 2218 | spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; |
michael@0 | 2219 | |
michael@0 | 2220 | if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) { |
michael@0 | 2221 | spa.sendv_prinfo.pr_policy = channel->mPrPolicy; |
michael@0 | 2222 | spa.sendv_prinfo.pr_value = channel->mPrValue; |
michael@0 | 2223 | spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; |
michael@0 | 2224 | } |
michael@0 | 2225 | |
michael@0 | 2226 | // Note: Main-thread IO, but doesn't block! |
michael@0 | 2227 | // XXX FIX! to deal with heavy overruns of JS trying to pass data in |
michael@0 | 2228 | // (more than the buffersize) queue data onto another thread to do the |
michael@0 | 2229 | // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp |
michael@0 | 2230 | |
michael@0 | 2231 | // SCTP will return EMSGSIZE if the message is bigger than the buffer |
michael@0 | 2232 | // size (or EAGAIN if there isn't space) |
michael@0 | 2233 | if (channel->mBufferedData.IsEmpty()) { |
michael@0 | 2234 | result = usrsctp_sendv(mSocket, data, length, |
michael@0 | 2235 | nullptr, 0, |
michael@0 | 2236 | (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa), |
michael@0 | 2237 | SCTP_SENDV_SPA, 0); |
michael@0 | 2238 | LOG(("Sent buffer (len=%u), result=%d", length, result)); |
michael@0 | 2239 | } else { |
michael@0 | 2240 | // Fake EAGAIN if we're already buffering data |
michael@0 | 2241 | result = -1; |
michael@0 | 2242 | errno = EAGAIN; |
michael@0 | 2243 | } |
michael@0 | 2244 | if (result < 0) { |
michael@0 | 2245 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
michael@0 | 2246 | // queue data for resend! And queue any further data for the stream until it is... |
michael@0 | 2247 | BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc |
michael@0 | 2248 | channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array |
michael@0 | 2249 | channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA; |
michael@0 | 2250 | LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length)); |
michael@0 | 2251 | StartDefer(); |
michael@0 | 2252 | return 0; |
michael@0 | 2253 | } |
michael@0 | 2254 | LOG(("error %d sending string", errno)); |
michael@0 | 2255 | } |
michael@0 | 2256 | return result; |
michael@0 | 2257 | } |
michael@0 | 2258 | |
michael@0 | 2259 | // Handles fragmenting binary messages |
michael@0 | 2260 | int32_t |
michael@0 | 2261 | DataChannelConnection::SendBinary(DataChannel *channel, const char *data, |
michael@0 | 2262 | uint32_t len, |
michael@0 | 2263 | uint32_t ppid_partial, uint32_t ppid_final) |
michael@0 | 2264 | { |
michael@0 | 2265 | // Since there's a limit on network buffer size and no limits on message |
michael@0 | 2266 | // size, and we don't want to use EOR mode (multiple writes for a |
michael@0 | 2267 | // message, but all other streams are blocked until you finish sending |
michael@0 | 2268 | // this message), we need to add application-level fragmentation of large |
michael@0 | 2269 | // messages. On a reliable channel, these can be simply rebuilt into a |
michael@0 | 2270 | // large message. On an unreliable channel, we can't and don't know how |
michael@0 | 2271 | // long to wait, and there are no retransmissions, and no easy way to |
michael@0 | 2272 | // tell the user "this part is missing", so on unreliable channels we |
michael@0 | 2273 | // need to return an error if sending more bytes than the network buffers |
michael@0 | 2274 | // can hold, and perhaps a lower number. |
michael@0 | 2275 | |
michael@0 | 2276 | // We *really* don't want to do this from main thread! - and SendMsgInternal |
michael@0 | 2277 | // avoids blocking. |
michael@0 | 2278 | // This MUST be reliable and in-order for the reassembly to work |
michael@0 | 2279 | if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT && |
michael@0 | 2280 | channel->mPrPolicy == DATA_CHANNEL_RELIABLE && |
michael@0 | 2281 | !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) { |
michael@0 | 2282 | int32_t sent=0; |
michael@0 | 2283 | uint32_t origlen = len; |
michael@0 | 2284 | LOG(("Sending binary message length %u in chunks", len)); |
michael@0 | 2285 | // XXX check flags for out-of-order, or force in-order for large binary messages |
michael@0 | 2286 | while (len > 0) { |
michael@0 | 2287 | uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT); |
michael@0 | 2288 | uint32_t ppid; |
michael@0 | 2289 | len -= sendlen; |
michael@0 | 2290 | ppid = len > 0 ? ppid_partial : ppid_final; |
michael@0 | 2291 | LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid)); |
michael@0 | 2292 | // Note that these might end up being deferred and queued. |
michael@0 | 2293 | sent += SendMsgInternal(channel, data, sendlen, ppid); |
michael@0 | 2294 | data += sendlen; |
michael@0 | 2295 | } |
michael@0 | 2296 | LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued", |
michael@0 | 2297 | (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT, |
michael@0 | 2298 | origlen, sent, |
michael@0 | 2299 | channel->mBufferedData.Length())); |
michael@0 | 2300 | return sent; |
michael@0 | 2301 | } |
michael@0 | 2302 | NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT, |
michael@0 | 2303 | "Sending too-large data on unreliable channel!"); |
michael@0 | 2304 | |
michael@0 | 2305 | // This will fail if the message is too large (default 256K) |
michael@0 | 2306 | return SendMsgInternal(channel, data, len, ppid_final); |
michael@0 | 2307 | } |
michael@0 | 2308 | |
michael@0 | 2309 | class ReadBlobRunnable : public nsRunnable { |
michael@0 | 2310 | public: |
michael@0 | 2311 | ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream, |
michael@0 | 2312 | nsIInputStream* aBlob) : |
michael@0 | 2313 | mConnection(aConnection), |
michael@0 | 2314 | mStream(aStream), |
michael@0 | 2315 | mBlob(aBlob) |
michael@0 | 2316 | { } |
michael@0 | 2317 | |
michael@0 | 2318 | NS_IMETHODIMP Run() { |
michael@0 | 2319 | // ReadBlob() is responsible to releasing the reference |
michael@0 | 2320 | DataChannelConnection *self = mConnection; |
michael@0 | 2321 | self->ReadBlob(mConnection.forget(), mStream, mBlob); |
michael@0 | 2322 | return NS_OK; |
michael@0 | 2323 | } |
michael@0 | 2324 | |
michael@0 | 2325 | private: |
michael@0 | 2326 | // Make sure the Connection doesn't die while there are jobs outstanding. |
michael@0 | 2327 | // Let it die (if released by PeerConnectionImpl while we're running) |
michael@0 | 2328 | // when we send our runnable back to MainThread. Then ~DataChannelConnection |
michael@0 | 2329 | // can send the IOThread to MainThread to die in a runnable, avoiding |
michael@0 | 2330 | // unsafe event loop recursion. Evil. |
michael@0 | 2331 | nsRefPtr<DataChannelConnection> mConnection; |
michael@0 | 2332 | uint16_t mStream; |
michael@0 | 2333 | // Use RefCount for preventing the object is deleted when SendBlob returns. |
michael@0 | 2334 | nsRefPtr<nsIInputStream> mBlob; |
michael@0 | 2335 | }; |
michael@0 | 2336 | |
michael@0 | 2337 | int32_t |
michael@0 | 2338 | DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) |
michael@0 | 2339 | { |
michael@0 | 2340 | DataChannel *channel = mStreams[stream]; |
michael@0 | 2341 | NS_ENSURE_TRUE(channel, 0); |
michael@0 | 2342 | // Spawn a thread to send the data |
michael@0 | 2343 | if (!mInternalIOThread) { |
michael@0 | 2344 | nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread)); |
michael@0 | 2345 | if (NS_FAILED(res)) { |
michael@0 | 2346 | return -1; |
michael@0 | 2347 | } |
michael@0 | 2348 | } |
michael@0 | 2349 | |
michael@0 | 2350 | nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob); |
michael@0 | 2351 | mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL); |
michael@0 | 2352 | return 0; |
michael@0 | 2353 | } |
michael@0 | 2354 | |
michael@0 | 2355 | void |
michael@0 | 2356 | DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis, |
michael@0 | 2357 | uint16_t aStream, nsIInputStream* aBlob) |
michael@0 | 2358 | { |
michael@0 | 2359 | // NOTE: 'aThis' has been forgotten by the caller to avoid releasing |
michael@0 | 2360 | // it off mainthread; if PeerConnectionImpl has released then we want |
michael@0 | 2361 | // ~DataChannelConnection() to run on MainThread |
michael@0 | 2362 | |
michael@0 | 2363 | // XXX to do this safely, we must enqueue these atomically onto the |
michael@0 | 2364 | // output socket. We need a sender thread(s?) to enque data into the |
michael@0 | 2365 | // socket and to avoid main-thread IO that might block. Even on a |
michael@0 | 2366 | // background thread, we may not want to block on one stream's data. |
michael@0 | 2367 | // I.e. run non-blocking and service multiple channels. |
michael@0 | 2368 | |
michael@0 | 2369 | // For now as a hack, send as a single blast of queued packets which may |
michael@0 | 2370 | // be deferred until buffer space is available. |
michael@0 | 2371 | nsCString temp; |
michael@0 | 2372 | uint64_t len; |
michael@0 | 2373 | nsCOMPtr<nsIThread> mainThread; |
michael@0 | 2374 | NS_GetMainThread(getter_AddRefs(mainThread)); |
michael@0 | 2375 | |
michael@0 | 2376 | if (NS_FAILED(aBlob->Available(&len)) || |
michael@0 | 2377 | NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) { |
michael@0 | 2378 | // Bug 966602: Doesn't return an error to the caller via onerror. |
michael@0 | 2379 | // We must release DataChannelConnection on MainThread to avoid issues (bug 876167) |
michael@0 | 2380 | NS_ProxyRelease(mainThread, aThis.take()); |
michael@0 | 2381 | return; |
michael@0 | 2382 | } |
michael@0 | 2383 | aBlob->Close(); |
michael@0 | 2384 | RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis), |
michael@0 | 2385 | &DataChannelConnection::SendBinaryMsg, |
michael@0 | 2386 | aStream, temp), |
michael@0 | 2387 | NS_DISPATCH_NORMAL); |
michael@0 | 2388 | } |
michael@0 | 2389 | |
michael@0 | 2390 | void |
michael@0 | 2391 | DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList) |
michael@0 | 2392 | { |
michael@0 | 2393 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 2394 | for (uint32_t i = 0; i < mStreams.Length(); ++i) { |
michael@0 | 2395 | if (mStreams[i]) { |
michael@0 | 2396 | aStreamList->push_back(mStreams[i]->mStream); |
michael@0 | 2397 | } |
michael@0 | 2398 | } |
michael@0 | 2399 | } |
michael@0 | 2400 | |
michael@0 | 2401 | int32_t |
michael@0 | 2402 | DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, |
michael@0 | 2403 | bool isBinary) |
michael@0 | 2404 | { |
michael@0 | 2405 | ASSERT_WEBRTC(NS_IsMainThread()); |
michael@0 | 2406 | // We really could allow this from other threads, so long as we deal with |
michael@0 | 2407 | // asynchronosity issues with channels closing, in particular access to |
michael@0 | 2408 | // mStreams, and issues with the association closing (access to mSocket). |
michael@0 | 2409 | |
michael@0 | 2410 | const char *data = aMsg.BeginReading(); |
michael@0 | 2411 | uint32_t len = aMsg.Length(); |
michael@0 | 2412 | DataChannel *channel; |
michael@0 | 2413 | |
michael@0 | 2414 | LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len)); |
michael@0 | 2415 | // XXX if we want more efficiency, translate flags once at open time |
michael@0 | 2416 | channel = mStreams[stream]; |
michael@0 | 2417 | NS_ENSURE_TRUE(channel, 0); |
michael@0 | 2418 | |
michael@0 | 2419 | if (isBinary) |
michael@0 | 2420 | return SendBinary(channel, data, len, |
michael@0 | 2421 | DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST); |
michael@0 | 2422 | return SendBinary(channel, data, len, |
michael@0 | 2423 | DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST); |
michael@0 | 2424 | } |
michael@0 | 2425 | |
michael@0 | 2426 | void |
michael@0 | 2427 | DataChannelConnection::Close(DataChannel *aChannel) |
michael@0 | 2428 | { |
michael@0 | 2429 | MutexAutoLock lock(mLock); |
michael@0 | 2430 | CloseInt(aChannel); |
michael@0 | 2431 | } |
michael@0 | 2432 | |
michael@0 | 2433 | // So we can call Close() with the lock already held |
michael@0 | 2434 | // Called from someone who holds a ref via ::Close(), or from ~DataChannel |
michael@0 | 2435 | void |
michael@0 | 2436 | DataChannelConnection::CloseInt(DataChannel *aChannel) |
michael@0 | 2437 | { |
michael@0 | 2438 | MOZ_ASSERT(aChannel); |
michael@0 | 2439 | nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us |
michael@0 | 2440 | |
michael@0 | 2441 | mLock.AssertCurrentThreadOwns(); |
michael@0 | 2442 | LOG(("Connection %p/Channel %p: Closing stream %u", |
michael@0 | 2443 | channel->mConnection.get(), channel.get(), channel->mStream)); |
michael@0 | 2444 | // re-test since it may have closed before the lock was grabbed |
michael@0 | 2445 | if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) { |
michael@0 | 2446 | LOG(("Channel already closing/closed (%u)", aChannel->mState)); |
michael@0 | 2447 | if (mState == CLOSED && channel->mStream != INVALID_STREAM) { |
michael@0 | 2448 | // called from CloseAll() |
michael@0 | 2449 | // we're not going to hang around waiting any more |
michael@0 | 2450 | mStreams[channel->mStream] = nullptr; |
michael@0 | 2451 | } |
michael@0 | 2452 | return; |
michael@0 | 2453 | } |
michael@0 | 2454 | aChannel->mBufferedData.Clear(); |
michael@0 | 2455 | if (channel->mStream != INVALID_STREAM) { |
michael@0 | 2456 | ResetOutgoingStream(channel->mStream); |
michael@0 | 2457 | if (mState == CLOSED) { // called from CloseAll() |
michael@0 | 2458 | // Let resets accumulate then send all at once in CloseAll() |
michael@0 | 2459 | // we're not going to hang around waiting |
michael@0 | 2460 | mStreams[channel->mStream] = nullptr; |
michael@0 | 2461 | } else { |
michael@0 | 2462 | SendOutgoingStreamReset(); |
michael@0 | 2463 | } |
michael@0 | 2464 | } |
michael@0 | 2465 | aChannel->mState = CLOSING; |
michael@0 | 2466 | if (mState == CLOSED) { |
michael@0 | 2467 | // we're not going to hang around waiting |
michael@0 | 2468 | channel->Destroy(); |
michael@0 | 2469 | } |
michael@0 | 2470 | // At this point when we leave here, the object is a zombie held alive only by the DOM object |
michael@0 | 2471 | } |
michael@0 | 2472 | |
michael@0 | 2473 | void DataChannelConnection::CloseAll() |
michael@0 | 2474 | { |
michael@0 | 2475 | LOG(("Closing all channels (connection %p)", (void*) this)); |
michael@0 | 2476 | // Don't need to lock here |
michael@0 | 2477 | |
michael@0 | 2478 | // Make sure no more channels will be opened |
michael@0 | 2479 | { |
michael@0 | 2480 | MutexAutoLock lock(mLock); |
michael@0 | 2481 | mState = CLOSED; |
michael@0 | 2482 | } |
michael@0 | 2483 | |
michael@0 | 2484 | // Close current channels |
michael@0 | 2485 | // If there are runnables, they hold a strong ref and keep the channel |
michael@0 | 2486 | // and/or connection alive (even if in a CLOSED state) |
michael@0 | 2487 | bool closed_some = false; |
michael@0 | 2488 | for (uint32_t i = 0; i < mStreams.Length(); ++i) { |
michael@0 | 2489 | if (mStreams[i]) { |
michael@0 | 2490 | mStreams[i]->Close(); |
michael@0 | 2491 | closed_some = true; |
michael@0 | 2492 | } |
michael@0 | 2493 | } |
michael@0 | 2494 | |
michael@0 | 2495 | // Clean up any pending opens for channels |
michael@0 | 2496 | nsRefPtr<DataChannel> channel; |
michael@0 | 2497 | while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) { |
michael@0 | 2498 | LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream)); |
michael@0 | 2499 | channel->Close(); // also releases the ref on each iteration |
michael@0 | 2500 | closed_some = true; |
michael@0 | 2501 | } |
michael@0 | 2502 | // It's more efficient to let the Resets queue in shutdown and then |
michael@0 | 2503 | // SendOutgoingStreamReset() here. |
michael@0 | 2504 | if (closed_some) { |
michael@0 | 2505 | MutexAutoLock lock(mLock); |
michael@0 | 2506 | SendOutgoingStreamReset(); |
michael@0 | 2507 | } |
michael@0 | 2508 | } |
michael@0 | 2509 | |
michael@0 | 2510 | DataChannel::~DataChannel() |
michael@0 | 2511 | { |
michael@0 | 2512 | // NS_ASSERTION since this is more "I think I caught all the cases that |
michael@0 | 2513 | // can cause this" than a true kill-the-program assertion. If this is |
michael@0 | 2514 | // wrong, nothing bad happens. A worst it's a leak. |
michael@0 | 2515 | NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel"); |
michael@0 | 2516 | } |
michael@0 | 2517 | |
michael@0 | 2518 | void |
michael@0 | 2519 | DataChannel::Close() |
michael@0 | 2520 | { |
michael@0 | 2521 | ENSURE_DATACONNECTION; |
michael@0 | 2522 | mConnection->Close(this); |
michael@0 | 2523 | } |
michael@0 | 2524 | |
michael@0 | 2525 | // Used when disconnecting from the DataChannelConnection |
michael@0 | 2526 | void |
michael@0 | 2527 | DataChannel::Destroy() |
michael@0 | 2528 | { |
michael@0 | 2529 | ENSURE_DATACONNECTION; |
michael@0 | 2530 | |
michael@0 | 2531 | LOG(("Destroying Data channel %u", mStream)); |
michael@0 | 2532 | MOZ_ASSERT_IF(mStream != INVALID_STREAM, |
michael@0 | 2533 | !mConnection->FindChannelByStream(mStream)); |
michael@0 | 2534 | mStream = INVALID_STREAM; |
michael@0 | 2535 | mState = CLOSED; |
michael@0 | 2536 | mConnection = nullptr; |
michael@0 | 2537 | } |
michael@0 | 2538 | |
michael@0 | 2539 | void |
michael@0 | 2540 | DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext) |
michael@0 | 2541 | { |
michael@0 | 2542 | MutexAutoLock mLock(mListenerLock); |
michael@0 | 2543 | mContext = aContext; |
michael@0 | 2544 | mListener = aListener; |
michael@0 | 2545 | } |
michael@0 | 2546 | |
michael@0 | 2547 | // May be called from another (i.e. Main) thread! |
michael@0 | 2548 | void |
michael@0 | 2549 | DataChannel::AppReady() |
michael@0 | 2550 | { |
michael@0 | 2551 | ENSURE_DATACONNECTION; |
michael@0 | 2552 | |
michael@0 | 2553 | MutexAutoLock lock(mConnection->mLock); |
michael@0 | 2554 | |
michael@0 | 2555 | mReady = true; |
michael@0 | 2556 | if (mState == WAITING_TO_OPEN) { |
michael@0 | 2557 | mState = OPEN; |
michael@0 | 2558 | NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
michael@0 | 2559 | DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection, |
michael@0 | 2560 | this)); |
michael@0 | 2561 | for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) { |
michael@0 | 2562 | nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i]; |
michael@0 | 2563 | MOZ_ASSERT(runnable); |
michael@0 | 2564 | NS_DispatchToMainThread(runnable); |
michael@0 | 2565 | } |
michael@0 | 2566 | } else { |
michael@0 | 2567 | NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN"); |
michael@0 | 2568 | } |
michael@0 | 2569 | mQueuedMessages.Clear(); |
michael@0 | 2570 | mQueuedMessages.Compact(); |
michael@0 | 2571 | // We never use it again... We could even allocate the array in the odd |
michael@0 | 2572 | // cases we need it. |
michael@0 | 2573 | } |
michael@0 | 2574 | |
michael@0 | 2575 | uint32_t |
michael@0 | 2576 | DataChannel::GetBufferedAmount() |
michael@0 | 2577 | { |
michael@0 | 2578 | uint32_t buffered = 0; |
michael@0 | 2579 | for (uint32_t i = 0; i < mBufferedData.Length(); ++i) { |
michael@0 | 2580 | buffered += mBufferedData[i]->mLength; |
michael@0 | 2581 | } |
michael@0 | 2582 | return buffered; |
michael@0 | 2583 | } |
michael@0 | 2584 | |
michael@0 | 2585 | // Called with mLock locked! |
michael@0 | 2586 | void |
michael@0 | 2587 | DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) |
michael@0 | 2588 | { |
michael@0 | 2589 | if (!mReady && |
michael@0 | 2590 | (mState == CONNECTING || mState == WAITING_TO_OPEN)) { |
michael@0 | 2591 | mQueuedMessages.AppendElement(aMessage); |
michael@0 | 2592 | } else { |
michael@0 | 2593 | NS_DispatchToMainThread(aMessage); |
michael@0 | 2594 | } |
michael@0 | 2595 | } |
michael@0 | 2596 | |
michael@0 | 2597 | } // namespace mozilla |