Thu, 15 Jan 2015 15:55:04 +0100
Back out 97036ab72558 which inappropriately compared turds to third parties.
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #if !defined(__Userspace_os_Windows)
10 #include <arpa/inet.h>
11 #endif
12 // usrsctp.h expects to have errno definitions prior to its inclusion.
13 #include <errno.h>
15 #define SCTP_DEBUG 1
16 #define SCTP_STDINT_INCLUDE <stdint.h>
18 #ifdef _MSC_VER
19 // Disable "warning C4200: nonstandard extension used : zero-sized array in
20 // struct/union"
21 // ...which the third-party file usrsctp.h runs afoul of.
22 #pragma warning(push)
23 #pragma warning(disable:4200)
24 #endif
26 #include "usrsctp.h"
28 #ifdef _MSC_VER
29 #pragma warning(pop)
30 #endif
32 #include "DataChannelLog.h"
34 #include "nsServiceManagerUtils.h"
35 #include "nsIObserverService.h"
36 #include "nsIObserver.h"
37 #include "mozilla/Services.h"
38 #include "nsProxyRelease.h"
39 #include "nsThread.h"
40 #include "nsThreadUtils.h"
41 #include "nsAutoPtr.h"
42 #include "nsNetUtil.h"
43 #include "mozilla/StaticPtr.h"
44 #ifdef MOZ_PEERCONNECTION
45 #include "mtransport/runnable_utils.h"
46 #endif
48 #define DATACHANNEL_LOG(args) LOG(args)
49 #include "DataChannel.h"
50 #include "DataChannelProtocol.h"
52 #ifdef PR_LOGGING
53 PRLogModuleInfo*
54 GetDataChannelLog()
55 {
56 static PRLogModuleInfo* sLog;
57 if (!sLog)
58 sLog = PR_NewLogModule("DataChannel");
59 return sLog;
60 }
62 PRLogModuleInfo*
63 GetSCTPLog()
64 {
65 static PRLogModuleInfo* sLog;
66 if (!sLog)
67 sLog = PR_NewLogModule("SCTP");
68 return sLog;
69 }
70 #endif
72 // Let us turn on and off important assertions in non-debug builds
73 #ifdef DEBUG
74 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
75 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
76 #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
77 #endif
79 static bool sctp_initialized;
81 namespace mozilla {
83 class DataChannelShutdown;
84 StaticRefPtr<DataChannelShutdown> gDataChannelShutdown;
86 class DataChannelShutdown : public nsIObserver
87 {
88 public:
89 // This needs to be tied to some form object that is guaranteed to be
90 // around (singleton likely) unless we want to shutdown sctp whenever
91 // we're not using it (and in which case we'd keep a refcnt'd object
92 // ref'd by each DataChannelConnection to release the SCTP usrlib via
93 // sctp_finish)
95 NS_DECL_ISUPPORTS
97 DataChannelShutdown() {}
99 void Init()
100 {
101 nsCOMPtr<nsIObserverService> observerService =
102 mozilla::services::GetObserverService();
103 if (!observerService)
104 return;
106 nsresult rv = observerService->AddObserver(this,
107 "profile-change-net-teardown",
108 false);
109 MOZ_ASSERT(rv == NS_OK);
110 (void) rv;
111 }
113 virtual ~DataChannelShutdown()
114 {
115 nsCOMPtr<nsIObserverService> observerService =
116 mozilla::services::GetObserverService();
117 if (observerService)
118 observerService->RemoveObserver(this, "profile-change-net-teardown");
119 }
121 NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic,
122 const char16_t* aData) {
123 if (strcmp(aTopic, "profile-change-net-teardown") == 0) {
124 LOG(("Shutting down SCTP"));
125 if (sctp_initialized) {
126 usrsctp_finish();
127 sctp_initialized = false;
128 }
129 nsCOMPtr<nsIObserverService> observerService =
130 mozilla::services::GetObserverService();
131 if (!observerService)
132 return NS_ERROR_FAILURE;
134 nsresult rv = observerService->RemoveObserver(this,
135 "profile-change-net-teardown");
136 MOZ_ASSERT(rv == NS_OK);
137 (void) rv;
139 nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this);
140 gDataChannelShutdown = nullptr;
141 }
142 return NS_OK;
143 }
144 };
146 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
149 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
150 uint32_t length) : mLength(length)
151 {
152 mSpa = new sctp_sendv_spa;
153 *mSpa = spa;
154 char *tmp = new char[length]; // infallible malloc!
155 memcpy(tmp, data, length);
156 mData = tmp;
157 }
159 BufferedMsg::~BufferedMsg()
160 {
161 delete mSpa;
162 delete mData;
163 }
165 static int
166 receive_cb(struct socket* sock, union sctp_sockstore addr,
167 void *data, size_t datalen,
168 struct sctp_rcvinfo rcv, int flags, void *ulp_info)
169 {
170 DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
171 return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
172 }
174 #ifdef PR_LOGGING
175 static void
176 debug_printf(const char *format, ...)
177 {
178 va_list ap;
179 char buffer[1024];
181 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
182 va_start(ap, format);
183 #ifdef _WIN32
184 if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
185 #else
186 if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) {
187 #endif
188 PR_LogPrint("%s", buffer);
189 }
190 va_end(ap);
191 }
192 }
193 #endif
195 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
196 mLock("netwerk::sctp::DataChannelConnection")
197 {
198 mState = CLOSED;
199 mSocket = nullptr;
200 mMasterSocket = nullptr;
201 mListener = listener->asWeakPtr();
202 mLocalPort = 0;
203 mRemotePort = 0;
204 mDeferTimeout = 10;
205 mTimerRunning = false;
206 LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
207 mInternalIOThread = nullptr;
208 }
210 DataChannelConnection::~DataChannelConnection()
211 {
212 LOG(("Deleting DataChannelConnection %p", (void *) this));
213 // This may die on the MainThread, or on the STS thread
214 ASSERT_WEBRTC(mState == CLOSED);
215 MOZ_ASSERT(!mMasterSocket);
216 MOZ_ASSERT(mPending.GetSize() == 0);
218 // Already disconnected from sigslot/mTransportFlow
219 // TransportFlows must be released from the STS thread
220 if (!IsSTSThread()) {
221 ASSERT_WEBRTC(NS_IsMainThread());
222 if (mTransportFlow) {
223 ASSERT_WEBRTC(mSTS);
224 NS_ProxyRelease(mSTS, mTransportFlow);
225 }
227 if (mInternalIOThread) {
228 // Avoid spinning the event thread from here (which if we're mainthread
229 // is in the event loop already)
230 NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
231 &nsIThread::Shutdown),
232 NS_DISPATCH_NORMAL);
233 }
234 } else {
235 // on STS, safe to call shutdown
236 if (mInternalIOThread) {
237 mInternalIOThread->Shutdown();
238 }
239 }
240 }
242 void
243 DataChannelConnection::Destroy()
244 {
245 // Though it's probably ok to do this and close the sockets;
246 // if we really want it to do true clean shutdowns it can
247 // create a dependant Internal object that would remain around
248 // until the network shut down the association or timed out.
249 LOG(("Destroying DataChannelConnection %p", (void *) this));
250 ASSERT_WEBRTC(NS_IsMainThread());
251 CloseAll();
253 MutexAutoLock lock(mLock);
254 // If we had a pending reset, we aren't waiting for it - clear the list so
255 // we can deregister this DataChannelConnection without leaking.
256 ClearResets();
258 MOZ_ASSERT(mSTS);
259 ASSERT_WEBRTC(NS_IsMainThread());
260 // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
261 // the usrsctp_close() calls can move back here (and just proxy the
262 // disconnect_all())
263 RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
264 &DataChannelConnection::DestroyOnSTS,
265 mSocket, mMasterSocket),
266 NS_DISPATCH_NORMAL);
268 // These will be released on STS
269 mSocket = nullptr;
270 mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
272 // Must do this in Destroy() since we may then delete this object
273 if (mUsingDtls) {
274 usrsctp_deregister_address(static_cast<void *>(this));
275 LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
276 }
278 // We can't get any more new callbacks from the SCTP library
279 // All existing callbacks have refs to DataChannelConnection
281 // nsDOMDataChannel objects have refs to DataChannels that have refs to us
282 }
284 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
285 struct socket *aSocket)
286 {
287 if (aSocket && aSocket != aMasterSocket)
288 usrsctp_close(aSocket);
289 if (aMasterSocket)
290 usrsctp_close(aMasterSocket);
292 disconnect_all();
293 }
295 NS_IMPL_ISUPPORTS(DataChannelConnection,
296 nsITimerCallback)
298 bool
299 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
300 {
301 struct sctp_initmsg initmsg;
302 struct sctp_udpencaps encaps;
303 struct sctp_assoc_value av;
304 struct sctp_event event;
305 socklen_t len;
307 uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
308 SCTP_PEER_ADDR_CHANGE,
309 SCTP_REMOTE_ERROR,
310 SCTP_SHUTDOWN_EVENT,
311 SCTP_ADAPTATION_INDICATION,
312 SCTP_SEND_FAILED_EVENT,
313 SCTP_STREAM_RESET_EVENT,
314 SCTP_STREAM_CHANGE_EVENT};
315 {
316 ASSERT_WEBRTC(NS_IsMainThread());
318 // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
319 if (!sctp_initialized) {
320 if (aUsingDtls) {
321 LOG(("sctp_init(DTLS)"));
322 #ifdef MOZ_PEERCONNECTION
323 usrsctp_init(0,
324 DataChannelConnection::SctpDtlsOutput,
325 #ifdef PR_LOGGING
326 debug_printf
327 #else
328 nullptr
329 #endif
330 );
331 #else
332 NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
333 #endif
334 } else {
335 LOG(("sctp_init(%u)", aPort));
336 usrsctp_init(aPort,
337 nullptr,
338 #ifdef PR_LOGGING
339 debug_printf
340 #else
341 nullptr
342 #endif
343 );
344 }
346 #ifdef PR_LOGGING
347 // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs
348 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) {
349 usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
350 }
351 #endif
352 usrsctp_sysctl_set_sctp_blackhole(2);
353 // ECN is currently not supported by the Firefox code
354 usrsctp_sysctl_set_sctp_ecn_enable(0);
355 sctp_initialized = true;
357 gDataChannelShutdown = new DataChannelShutdown();
358 gDataChannelShutdown->Init();
359 }
360 }
362 // XXX FIX! make this a global we get once
363 // Find the STS thread
364 nsresult rv;
365 mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
366 MOZ_ASSERT(NS_SUCCEEDED(rv));
368 // Open sctp with a callback
369 if ((mMasterSocket = usrsctp_socket(
370 aUsingDtls ? AF_CONN : AF_INET,
371 SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) {
372 return false;
373 }
375 // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
376 // in associations for normal IO
377 if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
378 LOG(("Couldn't set non_blocking on SCTP socket"));
379 // We can't handle connect() safely if it will block, not that this will
380 // even happen.
381 goto error_cleanup;
382 }
384 // Make sure when we close the socket, make sure it doesn't call us back again!
385 // This would cause it try to use an invalid DataChannelConnection pointer
386 struct linger l;
387 l.l_onoff = 1;
388 l.l_linger = 0;
389 if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
390 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
391 LOG(("Couldn't set SO_LINGER on SCTP socket"));
392 // unsafe to allow it to continue if this fails
393 goto error_cleanup;
394 }
396 // XXX Consider disabling this when we add proper SDP negotiation.
397 // We may want to leave enabled for supporting 'cloning' of SDP offers, which
398 // implies re-use of the same pseudo-port number, or forcing a renegotiation.
399 {
400 uint32_t on = 1;
401 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
402 (const void *)&on, (socklen_t)sizeof(on)) < 0) {
403 LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
404 }
405 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
406 (const void *)&on, (socklen_t)sizeof(on)) < 0) {
407 LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
408 }
409 }
411 if (!aUsingDtls) {
412 memset(&encaps, 0, sizeof(encaps));
413 encaps.sue_address.ss_family = AF_INET;
414 encaps.sue_port = htons(aPort);
415 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
416 (const void*)&encaps,
417 (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
418 LOG(("*** failed encaps errno %d", errno));
419 goto error_cleanup;
420 }
421 LOG(("SCTP encapsulation local port %d", aPort));
422 }
424 av.assoc_id = SCTP_ALL_ASSOC;
425 av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
426 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
427 (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
428 LOG(("*** failed enable stream reset errno %d", errno));
429 goto error_cleanup;
430 }
432 /* Enable the events of interest. */
433 memset(&event, 0, sizeof(event));
434 event.se_assoc_id = SCTP_ALL_ASSOC;
435 event.se_on = 1;
436 for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) {
437 event.se_type = event_types[i];
438 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
439 LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
440 goto error_cleanup;
441 }
442 }
444 // Update number of streams
445 mStreams.AppendElements(aNumStreams);
446 for (uint32_t i = 0; i < aNumStreams; ++i) {
447 mStreams[i] = nullptr;
448 }
449 memset(&initmsg, 0, sizeof(initmsg));
450 len = sizeof(initmsg);
451 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
452 LOG(("*** failed getsockopt SCTP_INITMSG"));
453 goto error_cleanup;
454 }
455 LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
456 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
457 initmsg.sinit_num_ostreams = aNumStreams;
458 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
459 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
460 (socklen_t)sizeof(initmsg)) < 0) {
461 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
462 goto error_cleanup;
463 }
465 mSocket = nullptr;
466 if (aUsingDtls) {
467 mUsingDtls = true;
468 usrsctp_register_address(static_cast<void *>(this));
469 LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
470 } else {
471 mUsingDtls = false;
472 }
473 return true;
475 error_cleanup:
476 usrsctp_close(mMasterSocket);
477 mMasterSocket = nullptr;
478 mUsingDtls = false;
479 return false;
480 }
482 void
483 DataChannelConnection::StartDefer()
484 {
485 nsresult rv;
486 if (!NS_IsMainThread()) {
487 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
488 DataChannelOnMessageAvailable::START_DEFER,
489 this, (DataChannel *) nullptr));
490 return;
491 }
493 ASSERT_WEBRTC(NS_IsMainThread());
494 if (!mDeferredTimer) {
495 mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
496 MOZ_ASSERT(mDeferredTimer);
497 }
499 if (!mTimerRunning) {
500 rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
501 nsITimer::TYPE_ONE_SHOT);
502 NS_ENSURE_TRUE_VOID(rv == NS_OK);
504 mTimerRunning = true;
505 }
506 }
508 // nsITimerCallback
510 NS_IMETHODIMP
511 DataChannelConnection::Notify(nsITimer *timer)
512 {
513 ASSERT_WEBRTC(NS_IsMainThread());
514 LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout));
516 if (timer == mDeferredTimer) {
517 if (SendDeferredMessages()) {
518 // Still blocked
519 // we don't need a lock, since this must be main thread...
520 nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout,
521 nsITimer::TYPE_ONE_SHOT);
522 if (NS_FAILED(rv)) {
523 LOG(("%s: cannot initialize open timer", __FUNCTION__));
524 // XXX and do....?
525 return rv;
526 }
527 mTimerRunning = true;
528 } else {
529 LOG(("Turned off deferred send timer"));
530 mTimerRunning = false;
531 }
532 }
533 return NS_OK;
534 }
536 #ifdef MOZ_PEERCONNECTION
537 void
538 DataChannelConnection::SetEvenOdd()
539 {
540 ASSERT_WEBRTC(IsSTSThread());
542 TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
543 mTransportFlow->GetLayer(TransportLayerDtls::ID()));
544 MOZ_ASSERT(dtls); // DTLS is mandatory
545 mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
546 }
548 bool
549 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
550 {
551 LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
553 NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
554 NS_ENSURE_TRUE(aFlow, false);
556 mTransportFlow = aFlow;
557 mLocalPort = localport;
558 mRemotePort = remoteport;
559 mState = CONNECTING;
561 RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this),
562 &DataChannelConnection::SetSignals),
563 NS_DISPATCH_NORMAL);
564 return true;
565 }
567 void
568 DataChannelConnection::SetSignals()
569 {
570 ASSERT_WEBRTC(IsSTSThread());
571 ASSERT_WEBRTC(mTransportFlow);
572 LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
573 mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
574 // SignalStateChange() doesn't call you with the initial state
575 mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
576 CompleteConnect(mTransportFlow, mTransportFlow->state());
577 }
579 void
580 DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
581 {
582 LOG(("Data transport state: %d", state));
583 MutexAutoLock lock(mLock);
584 ASSERT_WEBRTC(IsSTSThread());
585 // We should abort connection on TS_ERROR.
586 // Note however that the association will also fail (perhaps with a delay) and
587 // notify us in that way
588 if (state != TransportLayer::TS_OPEN || !mMasterSocket)
589 return;
591 struct sockaddr_conn addr;
592 memset(&addr, 0, sizeof(addr));
593 addr.sconn_family = AF_CONN;
594 #if defined(__Userspace_os_Darwin)
595 addr.sconn_len = sizeof(addr);
596 #endif
597 addr.sconn_port = htons(mLocalPort);
598 addr.sconn_addr = static_cast<void *>(this);
600 LOG(("Calling usrsctp_bind"));
601 int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
602 sizeof(addr));
603 if (r < 0) {
604 LOG(("usrsctp_bind failed: %d", r));
605 } else {
606 // This is the remote addr
607 addr.sconn_port = htons(mRemotePort);
608 LOG(("Calling usrsctp_connect"));
609 r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
610 sizeof(addr));
611 if (r < 0) {
612 if (errno == EINPROGRESS) {
613 // non-blocking
614 return;
615 } else {
616 LOG(("usrsctp_connect failed: %d", errno));
617 mState = CLOSED;
618 }
619 } else {
620 // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
621 // This also avoids issues with calling TransportFlow stuff on Mainthread
622 return;
623 }
624 }
625 // Note: currently this doesn't actually notify the application
626 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
627 DataChannelOnMessageAvailable::ON_CONNECTION,
628 this, false));
629 return;
630 }
632 // Process any pending Opens
633 void
634 DataChannelConnection::ProcessQueuedOpens()
635 {
636 // The nsDeque holds channels with an AddRef applied. Another reference
637 // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
638 // references should exist.
640 // Can't copy nsDeque's. Move into temp array since any that fail will
641 // go back to mPending
642 nsDeque temp;
643 DataChannel *temp_channel; // really already_AddRefed<>
644 while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
645 temp.Push(static_cast<void *>(temp_channel));
646 }
648 nsRefPtr<DataChannel> channel;
649 // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
650 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
651 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
652 LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
653 channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
654 // OpenFinish returns a reference itself, so we need to take it can Release it
655 channel = OpenFinish(channel.forget()); // may reset the flag and re-push
656 } else {
657 NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
658 }
659 }
661 }
662 void
663 DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
664 const unsigned char *data, size_t len)
665 {
666 #ifdef PR_LOGGING
667 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
668 char *buf;
670 if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
671 PR_LogPrint("%s", buf);
672 usrsctp_freedumpbuffer(buf);
673 }
674 }
675 #endif
676 // Pass the data to SCTP
677 usrsctp_conninput(static_cast<void *>(this), data, len, 0);
678 }
680 int
681 DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release)
682 {
683 //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
684 int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
685 if (release)
686 delete data;
687 return res;
688 }
690 /* static */
691 int
692 DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
693 uint8_t tos, uint8_t set_df)
694 {
695 DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
696 int res;
698 #ifdef PR_LOGGING
699 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) {
700 char *buf;
702 if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
703 PR_LogPrint("%s", buf);
704 usrsctp_freedumpbuffer(buf);
705 }
706 }
707 #endif
708 // We're async proxying even if on the STSThread because this is called
709 // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
710 // SCTP has an option for Apple, on IP connections only, to release at least
711 // one of the locks before calling a packet output routine; with changes to
712 // the underlying SCTP stack this might remove the need to use an async proxy.
713 if (0 /*peer->IsSTSThread()*/) {
714 res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
715 } else {
716 unsigned char *data = new unsigned char[length];
717 memcpy(data, buffer, length);
718 res = -1;
719 // XXX It might be worthwhile to add an assertion against the thread
720 // somehow getting into the DataChannel/SCTP code again, as
721 // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
722 // needs to be a per-thread check, not a global.
723 peer->mSTS->Dispatch(WrapRunnable(
724 nsRefPtr<DataChannelConnection>(peer),
725 &DataChannelConnection::SendPacket, data, length, true),
726 NS_DISPATCH_NORMAL);
727 res = 0; // cheat! Packets can always be dropped later anyways
728 }
729 return res;
730 }
731 #endif
733 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
734 // listen for incoming associations
735 // Blocks! - Don't call this from main thread!
737 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
739 bool
740 DataChannelConnection::Listen(unsigned short port)
741 {
742 struct sockaddr_in addr;
743 socklen_t addr_len;
745 NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
747 /* Acting as the 'server' */
748 memset((void *)&addr, 0, sizeof(addr));
749 #ifdef HAVE_SIN_LEN
750 addr.sin_len = sizeof(struct sockaddr_in);
751 #endif
752 addr.sin_family = AF_INET;
753 addr.sin_port = htons(port);
754 addr.sin_addr.s_addr = htonl(INADDR_ANY);
755 LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
756 mState = CONNECTING;
757 if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
758 LOG(("***Failed userspace_bind"));
759 return false;
760 }
761 if (usrsctp_listen(mMasterSocket, 1) < 0) {
762 LOG(("***Failed userspace_listen"));
763 return false;
764 }
766 LOG(("Accepting connection"));
767 addr_len = 0;
768 if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
769 LOG(("***Failed accept"));
770 return false;
771 }
772 mState = OPEN;
774 struct linger l;
775 l.l_onoff = 1;
776 l.l_linger = 0;
777 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
778 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
779 LOG(("Couldn't set SO_LINGER on SCTP socket"));
780 }
782 SetEvenOdd();
784 // Notify Connection open
785 // XXX We need to make sure connection sticks around until the message is delivered
786 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
787 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
788 DataChannelOnMessageAvailable::ON_CONNECTION,
789 this, (DataChannel *) nullptr));
790 return true;
791 }
793 // Blocks! - Don't call this from main thread!
794 bool
795 DataChannelConnection::Connect(const char *addr, unsigned short port)
796 {
797 struct sockaddr_in addr4;
798 struct sockaddr_in6 addr6;
800 NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!");
802 /* Acting as the connector */
803 LOG(("Connecting to %s, port %u", addr, port));
804 memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
805 memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
806 #ifdef HAVE_SIN_LEN
807 addr4.sin_len = sizeof(struct sockaddr_in);
808 #endif
809 #ifdef HAVE_SIN6_LEN
810 addr6.sin6_len = sizeof(struct sockaddr_in6);
811 #endif
812 addr4.sin_family = AF_INET;
813 addr6.sin6_family = AF_INET6;
814 addr4.sin_port = htons(port);
815 addr6.sin6_port = htons(port);
816 mState = CONNECTING;
818 #if !defined(__Userspace_os_Windows)
819 if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
820 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
821 LOG(("*** Failed userspace_connect"));
822 return false;
823 }
824 } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
825 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
826 LOG(("*** Failed userspace_connect"));
827 return false;
828 }
829 } else {
830 LOG(("*** Illegal destination address."));
831 }
832 #else
833 {
834 struct sockaddr_storage ss;
835 int sslen = sizeof(ss);
837 if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
838 addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
839 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
840 LOG(("*** Failed userspace_connect"));
841 return false;
842 }
843 } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
844 addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
845 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
846 LOG(("*** Failed userspace_connect"));
847 return false;
848 }
849 } else {
850 LOG(("*** Illegal destination address."));
851 }
852 }
853 #endif
855 mSocket = mMasterSocket;
857 LOG(("connect() succeeded! Entering connected mode"));
858 mState = OPEN;
860 SetEvenOdd();
862 // Notify Connection open
863 // XXX We need to make sure connection sticks around until the message is delivered
864 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
865 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
866 DataChannelOnMessageAvailable::ON_CONNECTION,
867 this, (DataChannel *) nullptr));
868 return true;
869 }
870 #endif
872 DataChannel *
873 DataChannelConnection::FindChannelByStream(uint16_t stream)
874 {
875 return mStreams.SafeElementAt(stream);
876 }
878 uint16_t
879 DataChannelConnection::FindFreeStream()
880 {
881 uint32_t i, j, limit;
883 limit = mStreams.Length();
884 if (limit > MAX_NUM_STREAMS)
885 limit = MAX_NUM_STREAMS;
887 for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
888 if (!mStreams[i]) {
889 // Verify it's not still in the process of closing
890 for (j = 0; j < mStreamsResetting.Length(); ++j) {
891 if (mStreamsResetting[j] == i) {
892 break;
893 }
894 }
895 if (j == mStreamsResetting.Length())
896 break;
897 }
898 }
899 if (i >= limit) {
900 return INVALID_STREAM;
901 }
902 return i;
903 }
905 bool
906 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
907 {
908 struct sctp_status status;
909 struct sctp_add_streams sas;
910 uint32_t outStreamsNeeded;
911 socklen_t len;
913 if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
914 aNeeded = MAX_NUM_STREAMS - mStreams.Length();
915 }
916 if (aNeeded <= 0) {
917 return false;
918 }
920 len = (socklen_t)sizeof(struct sctp_status);
921 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
922 LOG(("***failed: getsockopt SCTP_STATUS"));
923 return false;
924 }
925 outStreamsNeeded = aNeeded; // number to add
927 // Note: if multiple channel opens happen when we don't have enough space,
928 // we'll call RequestMoreStreams() multiple times
929 memset(&sas, 0, sizeof(sas));
930 sas.sas_instrms = 0;
931 sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
932 // Doesn't block, we get an event when it succeeds or fails
933 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
934 (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
935 if (errno == EALREADY) {
936 LOG(("Already have %u output streams", outStreamsNeeded));
937 return true;
938 }
940 LOG(("***failed: setsockopt ADD errno=%d", errno));
941 return false;
942 }
943 LOG(("Requested %u more streams", outStreamsNeeded));
944 // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
945 // values are larger than mStreams.Length()
946 return true;
947 }
949 int32_t
950 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
951 {
952 struct sctp_sndinfo sndinfo;
954 // Note: Main-thread IO, but doesn't block
955 memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
956 sndinfo.snd_sid = stream;
957 sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
958 if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
959 &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
960 SCTP_SENDV_SNDINFO, 0) < 0) {
961 //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
962 return (0);
963 }
964 return (1);
965 }
967 int32_t
968 DataChannelConnection::SendOpenAckMessage(uint16_t stream)
969 {
970 struct rtcweb_datachannel_ack ack;
972 memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
973 ack.msg_type = DATA_CHANNEL_ACK;
975 return SendControlMessage(&ack, sizeof(ack), stream);
976 }
978 int32_t
979 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
980 const nsACString& protocol,
981 uint16_t stream, bool unordered,
982 uint16_t prPolicy, uint32_t prValue)
983 {
984 int label_len = label.Length(); // not including nul
985 int proto_len = protocol.Length(); // not including nul
986 struct rtcweb_datachannel_open_request *req =
987 (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len);
988 // careful - request includes 1 char label
990 memset(req, 0, sizeof(struct rtcweb_datachannel_open_request));
991 req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
992 switch (prPolicy) {
993 case SCTP_PR_SCTP_NONE:
994 req->channel_type = DATA_CHANNEL_RELIABLE;
995 break;
996 case SCTP_PR_SCTP_TTL:
997 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
998 break;
999 case SCTP_PR_SCTP_RTX:
1000 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
1001 break;
1002 default:
1003 // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
1004 moz_free(req);
1005 return (0);
1006 }
1007 if (unordered) {
1008 // Per the current types, all differ by 0x80 between ordered and unordered
1009 req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
1010 }
1012 req->reliability_param = htonl(prValue);
1013 req->priority = htons(0); /* XXX: add support */
1014 req->label_length = htons(label_len);
1015 req->protocol_length = htons(proto_len);
1016 memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
1017 memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
1019 // sizeof(*req) already includes +1 byte for label, need nul for both strings
1020 int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream);
1022 moz_free(req);
1023 return result;
1024 }
1026 // XXX This should use a separate thread (outbound queue) which should
1027 // select() to know when to *try* to send data to the socket again.
1028 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
1029 // (just not sure in what direction). We could re-implement NSPR's
1030 // PR_POLL_WRITE/etc handling... with a lot of work.
1032 // Better yet, use the SCTP stack's notifications on buffer state to avoid
1033 // filling the SCTP's buffers.
1035 // returns if we're still blocked or not
1036 bool
1037 DataChannelConnection::SendDeferredMessages()
1038 {
1039 uint32_t i;
1040 nsRefPtr<DataChannel> channel; // we may null out the refs to this
1041 bool still_blocked = false;
1042 bool sent = false;
1044 // This may block while something is modifying channels, but should not block for IO
1045 MutexAutoLock lock(mLock);
1047 // XXX For total fairness, on a still_blocked we'd start next time at the
1048 // same index. Sorry, not going to bother for now.
1049 for (i = 0; i < mStreams.Length(); ++i) {
1050 channel = mStreams[i];
1051 if (!channel)
1052 continue;
1054 // Only one of these should be set....
1055 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
1056 if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
1057 channel->mStream,
1058 channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
1059 channel->mPrPolicy, channel->mPrValue)) {
1060 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
1062 channel->mState = OPEN;
1063 channel->mReady = true;
1064 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1065 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1066 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
1067 channel));
1068 sent = true;
1069 } else {
1070 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1071 still_blocked = true;
1072 } else {
1073 // Close the channel, inform the user
1074 mStreams[channel->mStream] = nullptr;
1075 channel->mState = CLOSED;
1076 // Don't need to reset; we didn't open it
1077 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1078 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1079 channel));
1080 }
1081 }
1082 }
1083 if (still_blocked)
1084 break;
1086 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
1087 if (SendOpenAckMessage(channel->mStream)) {
1088 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
1089 sent = true;
1090 } else {
1091 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1092 still_blocked = true;
1093 } else {
1094 // Close the channel, inform the user
1095 CloseInt(channel);
1096 // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1097 }
1098 }
1099 }
1100 if (still_blocked)
1101 break;
1103 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
1104 bool failed_send = false;
1105 int32_t result;
1107 if (channel->mState == CLOSED || channel->mState == CLOSING) {
1108 channel->mBufferedData.Clear();
1109 }
1110 while (!channel->mBufferedData.IsEmpty() &&
1111 !failed_send) {
1112 struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
1113 const char *data = channel->mBufferedData[0]->mData;
1114 uint32_t len = channel->mBufferedData[0]->mLength;
1116 // SCTP will return EMSGSIZE if the message is bigger than the buffer
1117 // size (or EAGAIN if there isn't space)
1118 if ((result = usrsctp_sendv(mSocket, data, len,
1119 nullptr, 0,
1120 (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
1121 SCTP_SENDV_SPA,
1122 0) < 0)) {
1123 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1124 // leave queued for resend
1125 failed_send = true;
1126 LOG(("queue full again when resending %d bytes (%d)", len, result));
1127 } else {
1128 LOG(("error %d re-sending string", errno));
1129 failed_send = true;
1130 }
1131 } else {
1132 LOG(("Resent buffer of %d bytes (%d)", len, result));
1133 sent = true;
1134 channel->mBufferedData.RemoveElementAt(0);
1135 }
1136 }
1137 if (channel->mBufferedData.IsEmpty())
1138 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
1139 else
1140 still_blocked = true;
1141 }
1142 if (still_blocked)
1143 break;
1144 }
1146 if (!still_blocked) {
1147 // mDeferTimeout becomes an estimate of how long we need to wait next time we block
1148 return false;
1149 }
1150 // adjust time? More time for next wait if we didn't send anything, less if did
1151 // Pretty crude, but better than nothing; just to keep CPU use down
1152 if (!sent && mDeferTimeout < 50)
1153 mDeferTimeout++;
1154 else if (sent && mDeferTimeout > 10)
1155 mDeferTimeout--;
1157 return true;
1158 }
1160 void
1161 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
1162 size_t length,
1163 uint16_t stream)
1164 {
1165 nsRefPtr<DataChannel> channel;
1166 uint32_t prValue;
1167 uint16_t prPolicy;
1168 uint32_t flags;
1170 mLock.AssertCurrentThreadOwns();
1172 if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
1173 LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
1174 (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
1175 if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
1176 return;
1177 }
1179 LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
1181 switch (req->channel_type) {
1182 case DATA_CHANNEL_RELIABLE:
1183 case DATA_CHANNEL_RELIABLE_UNORDERED:
1184 prPolicy = SCTP_PR_SCTP_NONE;
1185 break;
1186 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1187 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1188 prPolicy = SCTP_PR_SCTP_RTX;
1189 break;
1190 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1191 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1192 prPolicy = SCTP_PR_SCTP_TTL;
1193 break;
1194 default:
1195 LOG(("Unknown channel type", req->channel_type));
1196 /* XXX error handling */
1197 return;
1198 }
1199 prValue = ntohl(req->reliability_param);
1200 flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1202 if ((channel = FindChannelByStream(stream))) {
1203 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1204 LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
1205 stream, channel->mState));
1206 /* XXX: some error handling */
1207 } else {
1208 LOG(("Open for externally negotiated channel %u", stream));
1209 // XXX should also check protocol, maybe label
1210 if (prPolicy != channel->mPrPolicy ||
1211 prValue != channel->mPrValue ||
1212 flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
1213 {
1214 LOG(("WARNING: external negotiation mismatch with OpenRequest:"
1215 "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1216 stream, prPolicy, channel->mPrPolicy,
1217 prValue, channel->mPrValue, flags, channel->mFlags));
1218 }
1219 }
1220 return;
1221 }
1222 if (stream >= mStreams.Length()) {
1223 LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
1224 return;
1225 }
1227 nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1228 nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
1229 ntohs(req->protocol_length)));
1231 channel = new DataChannel(this,
1232 stream,
1233 DataChannel::CONNECTING,
1234 label,
1235 protocol,
1236 prPolicy, prValue,
1237 flags,
1238 nullptr, nullptr);
1239 mStreams[stream] = channel;
1241 channel->mState = DataChannel::WAITING_TO_OPEN;
1243 LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1244 channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
1245 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1246 DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
1247 this, channel));
1249 LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1251 if (!SendOpenAckMessage(stream)) {
1252 // XXX Only on EAGAIN!? And if not, then close the channel??
1253 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
1254 StartDefer();
1255 }
1257 // Now process any queued data messages for the channel (which will
1258 // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1259 // more that come in before that happens)
1260 DeliverQueuedData(stream);
1261 }
1263 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1264 // That would make this code moot. Keep it for now for backwards compatibility.
1265 void
1266 DataChannelConnection::DeliverQueuedData(uint16_t stream)
1267 {
1268 mLock.AssertCurrentThreadOwns();
1270 uint32_t i = 0;
1271 while (i < mQueuedData.Length()) {
1272 // Careful! we may modify the array length from within the loop!
1273 if (mQueuedData[i]->mStream == stream) {
1274 LOG(("Delivering queued data for stream %u, length %u",
1275 stream, mQueuedData[i]->mLength));
1276 // Deliver the queued data
1277 HandleDataMessage(mQueuedData[i]->mPpid,
1278 mQueuedData[i]->mData, mQueuedData[i]->mLength,
1279 mQueuedData[i]->mStream);
1280 mQueuedData.RemoveElementAt(i);
1281 continue; // don't bump index since we removed the element
1282 }
1283 i++;
1284 }
1285 }
1287 void
1288 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
1289 size_t length, uint16_t stream)
1290 {
1291 DataChannel *channel;
1293 mLock.AssertCurrentThreadOwns();
1295 channel = FindChannelByStream(stream);
1296 NS_ENSURE_TRUE_VOID(channel);
1298 LOG(("OpenAck received for stream %u, waiting=%d", stream,
1299 (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1301 channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1302 }
1304 void
1305 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
1306 {
1307 /* XXX: Send an error message? */
1308 LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
1309 // XXX Log to JS error console if possible
1310 }
1312 void
1313 DataChannelConnection::HandleDataMessage(uint32_t ppid,
1314 const void *data, size_t length,
1315 uint16_t stream)
1316 {
1317 DataChannel *channel;
1318 const char *buffer = (const char *) data;
1320 mLock.AssertCurrentThreadOwns();
1322 channel = FindChannelByStream(stream);
1324 // XXX A closed channel may trip this... check
1325 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1326 // That would make this code moot. Keep it for now for backwards compatibility.
1327 if (!channel) {
1328 // In the updated 0-RTT open case, the sender can send data immediately
1329 // after Open, and doesn't set the in-order bit (since we don't have a
1330 // response or ack). Also, with external negotiation, data can come in
1331 // before we're told about the external negotiation. We need to buffer
1332 // data until either a) Open comes in, if the ordering get messed up,
1333 // or b) the app tells us this channel was externally negotiated. When
1334 // these occur, we deliver the data.
1336 // Since this is rare and non-performance, keep a single list of queued
1337 // data messages to deliver once the channel opens.
1338 LOG(("Queuing data for stream %u, length %u", stream, length));
1339 // Copies data
1340 mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
1341 return;
1342 }
1344 // XXX should this be a simple if, no warnings/debugbreaks?
1345 NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
1347 {
1348 nsAutoCString recvData(buffer, length); // copies (<64) or allocates
1349 bool is_binary = true;
1351 if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
1352 ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
1353 is_binary = false;
1354 }
1355 if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1356 NS_WARNING("DataChannel message aborted by fragment type change!");
1357 channel->mRecvBuffer.Truncate(0);
1358 }
1359 channel->mIsRecvBinary = is_binary;
1361 switch (ppid) {
1362 case DATA_CHANNEL_PPID_DOMSTRING:
1363 case DATA_CHANNEL_PPID_BINARY:
1364 channel->mRecvBuffer += recvData;
1365 LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
1366 is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
1367 channel->mStream));
1368 return; // Not ready to notify application
1370 case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1371 LOG(("DataChannel: String message received of length %lu on channel %u",
1372 length, channel->mStream));
1373 if (!channel->mRecvBuffer.IsEmpty()) {
1374 channel->mRecvBuffer += recvData;
1375 LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
1376 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1377 DataChannelOnMessageAvailable::ON_DATA, this,
1378 channel, channel->mRecvBuffer, -1));
1379 channel->mRecvBuffer.Truncate(0);
1380 return;
1381 }
1382 // else send using recvData normally
1383 length = -1; // Flag for DOMString
1385 // WebSockets checks IsUTF8() here; we can try to deliver it
1386 break;
1388 case DATA_CHANNEL_PPID_BINARY_LAST:
1389 LOG(("DataChannel: Received binary message of length %lu on channel id %u",
1390 length, channel->mStream));
1391 if (!channel->mRecvBuffer.IsEmpty()) {
1392 channel->mRecvBuffer += recvData;
1393 LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
1394 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1395 DataChannelOnMessageAvailable::ON_DATA, this,
1396 channel, channel->mRecvBuffer,
1397 channel->mRecvBuffer.Length()));
1398 channel->mRecvBuffer.Truncate(0);
1399 return;
1400 }
1401 // else send using recvData normally
1402 break;
1404 default:
1405 NS_ERROR("Unknown data PPID");
1406 return;
1407 }
1408 /* Notify onmessage */
1409 LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
1410 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1411 DataChannelOnMessageAvailable::ON_DATA, this,
1412 channel, recvData, length));
1413 }
1414 }
1416 // Called with mLock locked!
1417 void
1418 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
1419 {
1420 const struct rtcweb_datachannel_open_request *req;
1421 const struct rtcweb_datachannel_ack *ack;
1423 mLock.AssertCurrentThreadOwns();
1425 switch (ppid) {
1426 case DATA_CHANNEL_PPID_CONTROL:
1427 req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1429 NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
1430 switch (req->msg_type) {
1431 case DATA_CHANNEL_OPEN_REQUEST:
1432 // structure includes a possibly-unused char label[1] (in a packed structure)
1433 NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
1435 HandleOpenRequestMessage(req, length, stream);
1436 break;
1437 case DATA_CHANNEL_ACK:
1438 // >= sizeof(*ack) checked above
1440 ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1441 HandleOpenAckMessage(ack, length, stream);
1442 break;
1443 default:
1444 HandleUnknownMessage(ppid, length, stream);
1445 break;
1446 }
1447 break;
1448 case DATA_CHANNEL_PPID_DOMSTRING:
1449 case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1450 case DATA_CHANNEL_PPID_BINARY:
1451 case DATA_CHANNEL_PPID_BINARY_LAST:
1452 HandleDataMessage(ppid, buffer, length, stream);
1453 break;
1454 default:
1455 LOG(("Message of length %lu, PPID %u on stream %u received.",
1456 length, ppid, stream));
1457 break;
1458 }
1459 }
1461 void
1462 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
1463 {
1464 uint32_t i, n;
1466 switch (sac->sac_state) {
1467 case SCTP_COMM_UP:
1468 LOG(("Association change: SCTP_COMM_UP"));
1469 if (mState == CONNECTING) {
1470 mSocket = mMasterSocket;
1471 mState = OPEN;
1473 SetEvenOdd();
1475 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1476 DataChannelOnMessageAvailable::ON_CONNECTION,
1477 this, true));
1478 LOG(("DTLS connect() succeeded! Entering connected mode"));
1480 // Open any streams pending...
1481 ProcessQueuedOpens();
1483 } else if (mState == OPEN) {
1484 LOG(("DataConnection Already OPEN"));
1485 } else {
1486 LOG(("Unexpected state: %d", mState));
1487 }
1488 break;
1489 case SCTP_COMM_LOST:
1490 LOG(("Association change: SCTP_COMM_LOST"));
1491 // This association is toast, so also close all the channels -- from mainthread!
1492 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1493 DataChannelOnMessageAvailable::ON_DISCONNECTED,
1494 this));
1495 break;
1496 case SCTP_RESTART:
1497 LOG(("Association change: SCTP_RESTART"));
1498 break;
1499 case SCTP_SHUTDOWN_COMP:
1500 LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1501 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1502 DataChannelOnMessageAvailable::ON_DISCONNECTED,
1503 this));
1504 break;
1505 case SCTP_CANT_STR_ASSOC:
1506 LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1507 break;
1508 default:
1509 LOG(("Association change: UNKNOWN"));
1510 break;
1511 }
1512 LOG(("Association change: streams (in/out) = (%u/%u)",
1513 sac->sac_inbound_streams, sac->sac_outbound_streams));
1515 NS_ENSURE_TRUE_VOID(sac);
1516 n = sac->sac_length - sizeof(*sac);
1517 if (((sac->sac_state == SCTP_COMM_UP) ||
1518 (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
1519 for (i = 0; i < n; ++i) {
1520 switch (sac->sac_info[i]) {
1521 case SCTP_ASSOC_SUPPORTS_PR:
1522 LOG(("Supports: PR"));
1523 break;
1524 case SCTP_ASSOC_SUPPORTS_AUTH:
1525 LOG(("Supports: AUTH"));
1526 break;
1527 case SCTP_ASSOC_SUPPORTS_ASCONF:
1528 LOG(("Supports: ASCONF"));
1529 break;
1530 case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1531 LOG(("Supports: MULTIBUF"));
1532 break;
1533 case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1534 LOG(("Supports: RE-CONFIG"));
1535 break;
1536 default:
1537 LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1538 break;
1539 }
1540 }
1541 } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1542 (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
1543 LOG(("Association: ABORT ="));
1544 for (i = 0; i < n; ++i) {
1545 LOG((" 0x%02x", sac->sac_info[i]));
1546 }
1547 }
1548 if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1549 (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1550 (sac->sac_state == SCTP_COMM_LOST)) {
1551 return;
1552 }
1553 }
1555 void
1556 DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
1557 {
1558 char addr_buf[INET6_ADDRSTRLEN];
1559 const char *addr = "";
1560 struct sockaddr_in *sin;
1561 struct sockaddr_in6 *sin6;
1562 #if defined(__Userspace_os_Windows)
1563 DWORD addr_len = INET6_ADDRSTRLEN;
1564 #endif
1566 switch (spc->spc_aaddr.ss_family) {
1567 case AF_INET:
1568 sin = (struct sockaddr_in *)&spc->spc_aaddr;
1569 #if !defined(__Userspace_os_Windows)
1570 addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1571 #else
1572 if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr,
1573 addr_buf, &addr_len)) {
1574 return;
1575 }
1576 #endif
1577 break;
1578 case AF_INET6:
1579 sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1580 #if !defined(__Userspace_os_Windows)
1581 addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1582 #else
1583 if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr,
1584 addr_buf, &addr_len)) {
1585 return;
1586 }
1587 #endif
1588 case AF_CONN:
1589 addr = "DTLS connection";
1590 break;
1591 default:
1592 break;
1593 }
1594 LOG(("Peer address %s is now ", addr));
1595 switch (spc->spc_state) {
1596 case SCTP_ADDR_AVAILABLE:
1597 LOG(("SCTP_ADDR_AVAILABLE"));
1598 break;
1599 case SCTP_ADDR_UNREACHABLE:
1600 LOG(("SCTP_ADDR_UNREACHABLE"));
1601 break;
1602 case SCTP_ADDR_REMOVED:
1603 LOG(("SCTP_ADDR_REMOVED"));
1604 break;
1605 case SCTP_ADDR_ADDED:
1606 LOG(("SCTP_ADDR_ADDED"));
1607 break;
1608 case SCTP_ADDR_MADE_PRIM:
1609 LOG(("SCTP_ADDR_MADE_PRIM"));
1610 break;
1611 case SCTP_ADDR_CONFIRMED:
1612 LOG(("SCTP_ADDR_CONFIRMED"));
1613 break;
1614 default:
1615 LOG(("UNKNOWN"));
1616 break;
1617 }
1618 LOG((" (error = 0x%08x).\n", spc->spc_error));
1619 }
1621 void
1622 DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
1623 {
1624 size_t i, n;
1626 n = sre->sre_length - sizeof(struct sctp_remote_error);
1627 LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1628 for (i = 0; i < n; ++i) {
1629 LOG((" 0x%02x", sre-> sre_data[i]));
1630 }
1631 }
1633 void
1634 DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
1635 {
1636 LOG(("Shutdown event."));
1637 /* XXX: notify all channels. */
1638 // Attempts to actually send anything will fail
1639 }
1641 void
1642 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
1643 {
1644 LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
1645 }
1647 void
1648 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
1649 {
1650 size_t i, n;
1652 if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
1653 LOG(("Unsent "));
1654 }
1655 if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
1656 LOG(("Sent "));
1657 }
1658 if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
1659 LOG(("(flags = %x) ", ssfe->ssfe_flags));
1660 }
1661 LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
1662 ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
1663 ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
1664 n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
1665 for (i = 0; i < n; ++i) {
1666 LOG((" 0x%02x", ssfe->ssfe_data[i]));
1667 }
1668 }
1670 void
1671 DataChannelConnection::ClearResets()
1672 {
1673 // Clear all pending resets
1674 if (!mStreamsResetting.IsEmpty()) {
1675 LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
1676 }
1678 for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
1679 nsRefPtr<DataChannel> channel;
1680 channel = FindChannelByStream(mStreamsResetting[i]);
1681 if (channel) {
1682 LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
1683 mStreams[channel->mStream] = nullptr;
1684 }
1685 }
1686 mStreamsResetting.Clear();
1687 }
1689 void
1690 DataChannelConnection::ResetOutgoingStream(uint16_t stream)
1691 {
1692 uint32_t i;
1694 mLock.AssertCurrentThreadOwns();
1695 LOG(("Connection %p: Resetting outgoing stream %u",
1696 (void *) this, stream));
1697 // Rarely has more than a couple items and only for a short time
1698 for (i = 0; i < mStreamsResetting.Length(); ++i) {
1699 if (mStreamsResetting[i] == stream) {
1700 return;
1701 }
1702 }
1703 mStreamsResetting.AppendElement(stream);
1704 }
1706 void
1707 DataChannelConnection::SendOutgoingStreamReset()
1708 {
1709 struct sctp_reset_streams *srs;
1710 uint32_t i;
1711 size_t len;
1713 LOG(("Connection %p: Sending outgoing stream reset for %d streams",
1714 (void *) this, mStreamsResetting.Length()));
1715 mLock.AssertCurrentThreadOwns();
1716 if (mStreamsResetting.IsEmpty()) {
1717 LOG(("No streams to reset"));
1718 return;
1719 }
1720 len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
1721 srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
1722 memset(srs, 0, len);
1723 srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
1724 srs->srs_number_streams = mStreamsResetting.Length();
1725 for (i = 0; i < mStreamsResetting.Length(); ++i) {
1726 srs->srs_stream_list[i] = mStreamsResetting[i];
1727 }
1728 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
1729 LOG(("***failed: setsockopt RESET, errno %d", errno));
1730 // if errno == EALREADY, this is normal - we can't send another reset
1731 // with one pending.
1732 // When we get an incoming reset (which may be a response to our
1733 // outstanding one), see if we have any pending outgoing resets and
1734 // send them
1735 } else {
1736 mStreamsResetting.Clear();
1737 }
1738 moz_free(srs);
1739 }
1741 void
1742 DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
1743 {
1744 uint32_t n, i;
1745 nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel
1747 if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
1748 !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
1749 n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
1750 for (i = 0; i < n; ++i) {
1751 if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
1752 channel = FindChannelByStream(strrst->strreset_stream_list[i]);
1753 if (channel) {
1754 // The other side closed the channel
1755 // We could be in three states:
1756 // 1. Normal state (input and output streams (OPEN)
1757 // Notify application, send a RESET in response on our
1758 // outbound channel. Go to CLOSED
1759 // 2. We sent our own reset (CLOSING); either they crossed on the
1760 // wire, or this is a response to our Reset.
1761 // Go to CLOSED
1762 // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
1763 // I believe this is impossible, as we don't have an input stream yet.
1765 LOG(("Incoming: Channel %u closed, state %d",
1766 channel->mStream, channel->mState));
1767 ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
1768 channel->mState == DataChannel::CLOSING ||
1769 channel->mState == DataChannel::CONNECTING ||
1770 channel->mState == DataChannel::WAITING_TO_OPEN);
1771 if (channel->mState == DataChannel::OPEN ||
1772 channel->mState == DataChannel::WAITING_TO_OPEN) {
1773 ResetOutgoingStream(channel->mStream);
1774 SendOutgoingStreamReset();
1775 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1776 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1777 channel));
1778 }
1779 mStreams[channel->mStream] = nullptr;
1781 LOG(("Disconnected DataChannel %p from connection %p",
1782 (void *) channel.get(), (void *) channel->mConnection.get()));
1783 channel->Destroy();
1784 // At this point when we leave here, the object is a zombie held alive only by the DOM object
1785 } else {
1786 LOG(("Can't find incoming channel %d",i));
1787 }
1788 }
1789 }
1790 }
1792 // In case we failed to send a RESET due to having one outstanding, process any pending resets now:
1793 if (!mStreamsResetting.IsEmpty()) {
1794 LOG(("Sending %d pending resets", mStreamsResetting.Length()));
1795 SendOutgoingStreamReset();
1796 }
1797 }
1799 void
1800 DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
1801 {
1802 uint16_t stream;
1803 uint32_t i;
1804 nsRefPtr<DataChannel> channel;
1806 if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
1807 LOG(("*** Failed increasing number of streams from %u (%u/%u)",
1808 mStreams.Length(),
1809 strchg->strchange_instrms,
1810 strchg->strchange_outstrms));
1811 // XXX FIX! notify pending opens of failure
1812 return;
1813 } else {
1814 if (strchg->strchange_instrms > mStreams.Length()) {
1815 LOG(("Other side increased streams from %u to %u",
1816 mStreams.Length(), strchg->strchange_instrms));
1817 }
1818 if (strchg->strchange_outstrms > mStreams.Length() ||
1819 strchg->strchange_instrms > mStreams.Length()) {
1820 uint16_t old_len = mStreams.Length();
1821 uint16_t new_len = std::max(strchg->strchange_outstrms,
1822 strchg->strchange_instrms);
1823 LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
1824 old_len, new_len, new_len - old_len,
1825 strchg->strchange_instrms));
1826 // make sure both are the same length
1827 mStreams.AppendElements(new_len - old_len);
1828 LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
1829 for (size_t i = old_len; i < mStreams.Length(); ++i) {
1830 mStreams[i] = nullptr;
1831 }
1832 // Re-process any channels waiting for streams.
1833 // Linear search, but we don't increase channels often and
1834 // the array would only get long in case of an app error normally
1836 // Make sure we request enough streams if there's a big jump in streams
1837 // Could make a more complex API for OpenXxxFinish() and avoid this loop
1838 int32_t num_needed = mPending.GetSize();
1839 LOG(("%d of %d new streams already needed", num_needed,
1840 new_len - old_len));
1841 num_needed -= (new_len - old_len); // number we added
1842 if (num_needed > 0) {
1843 if (num_needed < 16)
1844 num_needed = 16;
1845 LOG(("Not enough new streams, asking for %d more", num_needed));
1846 RequestMoreStreams(num_needed);
1847 } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
1848 LOG(("Requesting %d output streams to match partner",
1849 strchg->strchange_instrms - strchg->strchange_outstrms));
1850 RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
1851 }
1853 ProcessQueuedOpens();
1854 }
1855 // else probably not a change in # of streams
1856 }
1858 for (i = 0; i < mStreams.Length(); ++i) {
1859 channel = mStreams[i];
1860 if (!channel)
1861 continue;
1863 if ((channel->mState == CONNECTING) &&
1864 (channel->mStream == INVALID_STREAM)) {
1865 if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
1866 (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
1867 /* XXX: Signal to the other end. */
1868 channel->mState = CLOSED;
1869 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
1870 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1871 channel));
1872 // maybe fire onError (bug 843625)
1873 } else {
1874 stream = FindFreeStream();
1875 if (stream != INVALID_STREAM) {
1876 channel->mStream = stream;
1877 mStreams[stream] = channel;
1878 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
1879 /// XXX fix
1880 StartDefer();
1881 } else {
1882 /* We will not find more ... */
1883 break;
1884 }
1885 }
1886 }
1887 }
1888 }
1891 // Called with mLock locked!
1892 void
1893 DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
1894 {
1895 mLock.AssertCurrentThreadOwns();
1896 if (notif->sn_header.sn_length != (uint32_t)n) {
1897 return;
1898 }
1899 switch (notif->sn_header.sn_type) {
1900 case SCTP_ASSOC_CHANGE:
1901 HandleAssociationChangeEvent(&(notif->sn_assoc_change));
1902 break;
1903 case SCTP_PEER_ADDR_CHANGE:
1904 HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
1905 break;
1906 case SCTP_REMOTE_ERROR:
1907 HandleRemoteErrorEvent(&(notif->sn_remote_error));
1908 break;
1909 case SCTP_SHUTDOWN_EVENT:
1910 HandleShutdownEvent(&(notif->sn_shutdown_event));
1911 break;
1912 case SCTP_ADAPTATION_INDICATION:
1913 HandleAdaptationIndication(&(notif->sn_adaptation_event));
1914 break;
1915 case SCTP_PARTIAL_DELIVERY_EVENT:
1916 LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
1917 break;
1918 case SCTP_AUTHENTICATION_EVENT:
1919 LOG(("SCTP_AUTHENTICATION_EVENT"));
1920 break;
1921 case SCTP_SENDER_DRY_EVENT:
1922 //LOG(("SCTP_SENDER_DRY_EVENT"));
1923 break;
1924 case SCTP_NOTIFICATIONS_STOPPED_EVENT:
1925 LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
1926 break;
1927 case SCTP_SEND_FAILED_EVENT:
1928 HandleSendFailedEvent(&(notif->sn_send_failed_event));
1929 break;
1930 case SCTP_STREAM_RESET_EVENT:
1931 HandleStreamResetEvent(&(notif->sn_strreset_event));
1932 break;
1933 case SCTP_ASSOC_RESET_EVENT:
1934 LOG(("SCTP_ASSOC_RESET_EVENT"));
1935 break;
1936 case SCTP_STREAM_CHANGE_EVENT:
1937 HandleStreamChangeEvent(&(notif->sn_strchange_event));
1938 break;
1939 default:
1940 LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
1941 break;
1942 }
1943 }
1945 int
1946 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
1947 struct sctp_rcvinfo rcv, int32_t flags)
1948 {
1949 ASSERT_WEBRTC(!NS_IsMainThread());
1951 if (!data) {
1952 usrsctp_close(sock); // SCTP has finished shutting down
1953 } else {
1954 MutexAutoLock lock(mLock);
1955 if (flags & MSG_NOTIFICATION) {
1956 HandleNotification(static_cast<union sctp_notification *>(data), datalen);
1957 } else {
1958 HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
1959 }
1960 }
1961 // sctp allocates 'data' with malloc(), and expects the receiver to free
1962 // it (presumably with free).
1963 // XXX future optimization: try to deliver messages without an internal
1964 // alloc/copy, and if so delay the free until later.
1965 free(data);
1966 // usrsctp defines the callback as returning an int, but doesn't use it
1967 return 1;
1968 }
1970 already_AddRefed<DataChannel>
1971 DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
1972 Type type, bool inOrder,
1973 uint32_t prValue, DataChannelListener *aListener,
1974 nsISupports *aContext, bool aExternalNegotiated,
1975 uint16_t aStream)
1976 {
1977 // aStream == INVALID_STREAM to have the protocol allocate
1978 uint16_t prPolicy = SCTP_PR_SCTP_NONE;
1979 uint32_t flags;
1981 LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
1982 PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
1983 type, inOrder, prValue, aListener, aContext,
1984 aExternalNegotiated ? "true" : "false", aStream));
1985 switch (type) {
1986 case DATA_CHANNEL_RELIABLE:
1987 prPolicy = SCTP_PR_SCTP_NONE;
1988 break;
1989 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1990 prPolicy = SCTP_PR_SCTP_RTX;
1991 break;
1992 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1993 prPolicy = SCTP_PR_SCTP_TTL;
1994 break;
1995 }
1996 if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
1997 return nullptr;
1998 }
2000 // Don't look past currently-negotiated streams
2001 if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
2002 LOG(("ERROR: external negotiation of already-open channel %u", aStream));
2003 // XXX How do we indicate this up to the application? Probably the
2004 // caller's job, but we may need to return an error code.
2005 return nullptr;
2006 }
2008 flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
2009 nsRefPtr<DataChannel> channel(new DataChannel(this,
2010 aStream,
2011 DataChannel::CONNECTING,
2012 label, protocol,
2013 type, prValue,
2014 flags,
2015 aListener, aContext));
2016 if (aExternalNegotiated) {
2017 channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
2018 }
2020 MutexAutoLock lock(mLock); // OpenFinish assumes this
2021 return OpenFinish(channel.forget());
2022 }
2024 // Separate routine so we can also call it to finish up from pending opens
2025 already_AddRefed<DataChannel>
2026 DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
2027 {
2028 nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in
2029 // Normally 1 reference if called from ::Open(), or 2 if called from
2030 // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2031 uint16_t stream = channel->mStream;
2032 bool queue = false;
2034 mLock.AssertCurrentThreadOwns();
2036 // Cases we care about:
2037 // Pre-negotiated:
2038 // Not Open:
2039 // Doesn't fit:
2040 // -> change initial ask or renegotiate after open
2041 // -> queue open
2042 // Open:
2043 // Doesn't fit:
2044 // -> RequestMoreStreams && queue
2045 // Does fit:
2046 // -> open
2047 // Not negotiated:
2048 // Not Open:
2049 // -> queue open
2050 // Open:
2051 // -> Try to get a stream
2052 // Doesn't fit:
2053 // -> RequestMoreStreams && queue
2054 // Does fit:
2055 // -> open
2056 // So the Open cases are basically the same
2057 // Not Open cases are simply queue for non-negotiated, and
2058 // either change the initial ask or possibly renegotiate after open.
2060 if (mState == OPEN) {
2061 if (stream == INVALID_STREAM) {
2062 stream = FindFreeStream(); // may be INVALID_STREAM if we need more
2063 }
2064 if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2065 // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
2066 // to avoid going back immediately for more if the ask to N, N+1, etc
2067 int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
2068 (stream-((int32_t)mStreams.Length())) + 16;
2069 if (!RequestMoreStreams(more_needed)) {
2070 // Something bad happened... we're done
2071 goto request_error_cleanup;
2072 }
2073 queue = true;
2074 }
2075 } else {
2076 // not OPEN
2077 if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2078 mState == CLOSED) {
2079 // Update number of streams for init message
2080 struct sctp_initmsg initmsg;
2081 socklen_t len = sizeof(initmsg);
2082 int32_t total_needed = stream+16;
2084 memset(&initmsg, 0, sizeof(initmsg));
2085 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
2086 LOG(("*** failed getsockopt SCTP_INITMSG"));
2087 goto request_error_cleanup;
2088 }
2089 LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2090 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2091 initmsg.sinit_num_ostreams = total_needed;
2092 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2093 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
2094 (socklen_t)sizeof(initmsg)) < 0) {
2095 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2096 goto request_error_cleanup;
2097 }
2099 int32_t old_len = mStreams.Length();
2100 mStreams.AppendElements(total_needed - old_len);
2101 for (int32_t i = old_len; i < total_needed; ++i) {
2102 mStreams[i] = nullptr;
2103 }
2104 }
2105 // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2106 // is called, if needed
2107 queue = true;
2108 }
2109 if (queue) {
2110 LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2111 // Also serves to mark we told the app
2112 channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2113 channel->AddRef(); // we need a ref for the nsDeQue and one to return
2114 mPending.Push(channel);
2115 return channel.forget();
2116 }
2118 MOZ_ASSERT(stream != INVALID_STREAM);
2119 // just allocated (& OPEN), or externally negotiated
2120 mStreams[stream] = channel; // holds a reference
2121 channel->mStream = stream;
2123 #ifdef TEST_QUEUED_DATA
2124 // It's painful to write a test for this...
2125 channel->mState = OPEN;
2126 channel->mReady = true;
2127 SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2128 #endif
2130 if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2131 // Don't send unordered until this gets cleared
2132 channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2133 }
2135 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2136 if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
2137 stream,
2138 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2139 channel->mPrPolicy, channel->mPrValue)) {
2140 LOG(("SendOpenRequest failed, errno = %d", errno));
2141 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2142 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
2143 StartDefer();
2145 return channel.forget();
2146 } else {
2147 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2148 // We already returned the channel to the app.
2149 NS_ERROR("Failed to send open request");
2150 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
2151 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2152 channel));
2153 }
2154 // If we haven't returned the channel yet, it will get destroyed when we exit
2155 // this function.
2156 mStreams[stream] = nullptr;
2157 channel->mStream = INVALID_STREAM;
2158 // we'll be destroying the channel
2159 channel->mState = CLOSED;
2160 return nullptr;
2161 }
2162 /* NOTREACHED */
2163 }
2164 }
2165 // Either externally negotiated or we sent Open
2166 channel->mState = OPEN;
2167 channel->mReady = true;
2168 // FIX? Move into DOMDataChannel? I don't think we can send it yet here
2169 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2170 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
2171 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2172 channel));
2174 return channel.forget();
2176 request_error_cleanup:
2177 channel->mState = CLOSED;
2178 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2179 // We already returned the channel to the app.
2180 NS_ERROR("Failed to request more streams");
2181 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
2182 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2183 channel));
2184 return channel.forget();
2185 }
2186 // we'll be destroying the channel, but it never really got set up
2187 // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2188 // Dispatch it to ourselves
2189 return nullptr;
2190 }
2192 int32_t
2193 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
2194 uint32_t length, uint32_t ppid)
2195 {
2196 uint16_t flags;
2197 struct sctp_sendv_spa spa;
2198 int32_t result;
2200 NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
2201 NS_WARN_IF_FALSE(length > 0, "Length is 0?!");
2203 // To avoid problems where an in-order OPEN is lost and an
2204 // out-of-order data message "beats" it, require data to be in-order
2205 // until we get an ACK.
2206 if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2207 !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2208 flags = SCTP_UNORDERED;
2209 } else {
2210 flags = 0;
2211 }
2213 spa.sendv_sndinfo.snd_ppid = htonl(ppid);
2214 spa.sendv_sndinfo.snd_sid = channel->mStream;
2215 spa.sendv_sndinfo.snd_flags = flags;
2216 spa.sendv_sndinfo.snd_context = 0;
2217 spa.sendv_sndinfo.snd_assoc_id = 0;
2218 spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2220 if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
2221 spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
2222 spa.sendv_prinfo.pr_value = channel->mPrValue;
2223 spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2224 }
2226 // Note: Main-thread IO, but doesn't block!
2227 // XXX FIX! to deal with heavy overruns of JS trying to pass data in
2228 // (more than the buffersize) queue data onto another thread to do the
2229 // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
2231 // SCTP will return EMSGSIZE if the message is bigger than the buffer
2232 // size (or EAGAIN if there isn't space)
2233 if (channel->mBufferedData.IsEmpty()) {
2234 result = usrsctp_sendv(mSocket, data, length,
2235 nullptr, 0,
2236 (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
2237 SCTP_SENDV_SPA, 0);
2238 LOG(("Sent buffer (len=%u), result=%d", length, result));
2239 } else {
2240 // Fake EAGAIN if we're already buffering data
2241 result = -1;
2242 errno = EAGAIN;
2243 }
2244 if (result < 0) {
2245 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2246 // queue data for resend! And queue any further data for the stream until it is...
2247 BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc
2248 channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
2249 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
2250 LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
2251 StartDefer();
2252 return 0;
2253 }
2254 LOG(("error %d sending string", errno));
2255 }
2256 return result;
2257 }
2259 // Handles fragmenting binary messages
2260 int32_t
2261 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
2262 uint32_t len,
2263 uint32_t ppid_partial, uint32_t ppid_final)
2264 {
2265 // Since there's a limit on network buffer size and no limits on message
2266 // size, and we don't want to use EOR mode (multiple writes for a
2267 // message, but all other streams are blocked until you finish sending
2268 // this message), we need to add application-level fragmentation of large
2269 // messages. On a reliable channel, these can be simply rebuilt into a
2270 // large message. On an unreliable channel, we can't and don't know how
2271 // long to wait, and there are no retransmissions, and no easy way to
2272 // tell the user "this part is missing", so on unreliable channels we
2273 // need to return an error if sending more bytes than the network buffers
2274 // can hold, and perhaps a lower number.
2276 // We *really* don't want to do this from main thread! - and SendMsgInternal
2277 // avoids blocking.
2278 // This MUST be reliable and in-order for the reassembly to work
2279 if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2280 channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
2281 !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2282 int32_t sent=0;
2283 uint32_t origlen = len;
2284 LOG(("Sending binary message length %u in chunks", len));
2285 // XXX check flags for out-of-order, or force in-order for large binary messages
2286 while (len > 0) {
2287 uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2288 uint32_t ppid;
2289 len -= sendlen;
2290 ppid = len > 0 ? ppid_partial : ppid_final;
2291 LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
2292 // Note that these might end up being deferred and queued.
2293 sent += SendMsgInternal(channel, data, sendlen, ppid);
2294 data += sendlen;
2295 }
2296 LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
2297 (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2298 origlen, sent,
2299 channel->mBufferedData.Length()));
2300 return sent;
2301 }
2302 NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2303 "Sending too-large data on unreliable channel!");
2305 // This will fail if the message is too large (default 256K)
2306 return SendMsgInternal(channel, data, len, ppid_final);
2307 }
2309 class ReadBlobRunnable : public nsRunnable {
2310 public:
2311 ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
2312 nsIInputStream* aBlob) :
2313 mConnection(aConnection),
2314 mStream(aStream),
2315 mBlob(aBlob)
2316 { }
2318 NS_IMETHODIMP Run() {
2319 // ReadBlob() is responsible to releasing the reference
2320 DataChannelConnection *self = mConnection;
2321 self->ReadBlob(mConnection.forget(), mStream, mBlob);
2322 return NS_OK;
2323 }
2325 private:
2326 // Make sure the Connection doesn't die while there are jobs outstanding.
2327 // Let it die (if released by PeerConnectionImpl while we're running)
2328 // when we send our runnable back to MainThread. Then ~DataChannelConnection
2329 // can send the IOThread to MainThread to die in a runnable, avoiding
2330 // unsafe event loop recursion. Evil.
2331 nsRefPtr<DataChannelConnection> mConnection;
2332 uint16_t mStream;
2333 // Use RefCount for preventing the object is deleted when SendBlob returns.
2334 nsRefPtr<nsIInputStream> mBlob;
2335 };
2337 int32_t
2338 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
2339 {
2340 DataChannel *channel = mStreams[stream];
2341 NS_ENSURE_TRUE(channel, 0);
2342 // Spawn a thread to send the data
2343 if (!mInternalIOThread) {
2344 nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
2345 if (NS_FAILED(res)) {
2346 return -1;
2347 }
2348 }
2350 nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob);
2351 mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL);
2352 return 0;
2353 }
2355 void
2356 DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
2357 uint16_t aStream, nsIInputStream* aBlob)
2358 {
2359 // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2360 // it off mainthread; if PeerConnectionImpl has released then we want
2361 // ~DataChannelConnection() to run on MainThread
2363 // XXX to do this safely, we must enqueue these atomically onto the
2364 // output socket. We need a sender thread(s?) to enque data into the
2365 // socket and to avoid main-thread IO that might block. Even on a
2366 // background thread, we may not want to block on one stream's data.
2367 // I.e. run non-blocking and service multiple channels.
2369 // For now as a hack, send as a single blast of queued packets which may
2370 // be deferred until buffer space is available.
2371 nsCString temp;
2372 uint64_t len;
2373 nsCOMPtr<nsIThread> mainThread;
2374 NS_GetMainThread(getter_AddRefs(mainThread));
2376 if (NS_FAILED(aBlob->Available(&len)) ||
2377 NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) {
2378 // Bug 966602: Doesn't return an error to the caller via onerror.
2379 // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
2380 NS_ProxyRelease(mainThread, aThis.take());
2381 return;
2382 }
2383 aBlob->Close();
2384 RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis),
2385 &DataChannelConnection::SendBinaryMsg,
2386 aStream, temp),
2387 NS_DISPATCH_NORMAL);
2388 }
2390 void
2391 DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
2392 {
2393 ASSERT_WEBRTC(NS_IsMainThread());
2394 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2395 if (mStreams[i]) {
2396 aStreamList->push_back(mStreams[i]->mStream);
2397 }
2398 }
2399 }
2401 int32_t
2402 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
2403 bool isBinary)
2404 {
2405 ASSERT_WEBRTC(NS_IsMainThread());
2406 // We really could allow this from other threads, so long as we deal with
2407 // asynchronosity issues with channels closing, in particular access to
2408 // mStreams, and issues with the association closing (access to mSocket).
2410 const char *data = aMsg.BeginReading();
2411 uint32_t len = aMsg.Length();
2412 DataChannel *channel;
2414 LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
2415 // XXX if we want more efficiency, translate flags once at open time
2416 channel = mStreams[stream];
2417 NS_ENSURE_TRUE(channel, 0);
2419 if (isBinary)
2420 return SendBinary(channel, data, len,
2421 DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
2422 return SendBinary(channel, data, len,
2423 DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2424 }
2426 void
2427 DataChannelConnection::Close(DataChannel *aChannel)
2428 {
2429 MutexAutoLock lock(mLock);
2430 CloseInt(aChannel);
2431 }
2433 // So we can call Close() with the lock already held
2434 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
2435 void
2436 DataChannelConnection::CloseInt(DataChannel *aChannel)
2437 {
2438 MOZ_ASSERT(aChannel);
2439 nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
2441 mLock.AssertCurrentThreadOwns();
2442 LOG(("Connection %p/Channel %p: Closing stream %u",
2443 channel->mConnection.get(), channel.get(), channel->mStream));
2444 // re-test since it may have closed before the lock was grabbed
2445 if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
2446 LOG(("Channel already closing/closed (%u)", aChannel->mState));
2447 if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
2448 // called from CloseAll()
2449 // we're not going to hang around waiting any more
2450 mStreams[channel->mStream] = nullptr;
2451 }
2452 return;
2453 }
2454 aChannel->mBufferedData.Clear();
2455 if (channel->mStream != INVALID_STREAM) {
2456 ResetOutgoingStream(channel->mStream);
2457 if (mState == CLOSED) { // called from CloseAll()
2458 // Let resets accumulate then send all at once in CloseAll()
2459 // we're not going to hang around waiting
2460 mStreams[channel->mStream] = nullptr;
2461 } else {
2462 SendOutgoingStreamReset();
2463 }
2464 }
2465 aChannel->mState = CLOSING;
2466 if (mState == CLOSED) {
2467 // we're not going to hang around waiting
2468 channel->Destroy();
2469 }
2470 // At this point when we leave here, the object is a zombie held alive only by the DOM object
2471 }
2473 void DataChannelConnection::CloseAll()
2474 {
2475 LOG(("Closing all channels (connection %p)", (void*) this));
2476 // Don't need to lock here
2478 // Make sure no more channels will be opened
2479 {
2480 MutexAutoLock lock(mLock);
2481 mState = CLOSED;
2482 }
2484 // Close current channels
2485 // If there are runnables, they hold a strong ref and keep the channel
2486 // and/or connection alive (even if in a CLOSED state)
2487 bool closed_some = false;
2488 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2489 if (mStreams[i]) {
2490 mStreams[i]->Close();
2491 closed_some = true;
2492 }
2493 }
2495 // Clean up any pending opens for channels
2496 nsRefPtr<DataChannel> channel;
2497 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
2498 LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
2499 channel->Close(); // also releases the ref on each iteration
2500 closed_some = true;
2501 }
2502 // It's more efficient to let the Resets queue in shutdown and then
2503 // SendOutgoingStreamReset() here.
2504 if (closed_some) {
2505 MutexAutoLock lock(mLock);
2506 SendOutgoingStreamReset();
2507 }
2508 }
2510 DataChannel::~DataChannel()
2511 {
2512 // NS_ASSERTION since this is more "I think I caught all the cases that
2513 // can cause this" than a true kill-the-program assertion. If this is
2514 // wrong, nothing bad happens. A worst it's a leak.
2515 NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
2516 }
2518 void
2519 DataChannel::Close()
2520 {
2521 ENSURE_DATACONNECTION;
2522 mConnection->Close(this);
2523 }
2525 // Used when disconnecting from the DataChannelConnection
2526 void
2527 DataChannel::Destroy()
2528 {
2529 ENSURE_DATACONNECTION;
2531 LOG(("Destroying Data channel %u", mStream));
2532 MOZ_ASSERT_IF(mStream != INVALID_STREAM,
2533 !mConnection->FindChannelByStream(mStream));
2534 mStream = INVALID_STREAM;
2535 mState = CLOSED;
2536 mConnection = nullptr;
2537 }
2539 void
2540 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
2541 {
2542 MutexAutoLock mLock(mListenerLock);
2543 mContext = aContext;
2544 mListener = aListener;
2545 }
2547 // May be called from another (i.e. Main) thread!
2548 void
2549 DataChannel::AppReady()
2550 {
2551 ENSURE_DATACONNECTION;
2553 MutexAutoLock lock(mConnection->mLock);
2555 mReady = true;
2556 if (mState == WAITING_TO_OPEN) {
2557 mState = OPEN;
2558 NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
2559 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
2560 this));
2561 for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
2562 nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
2563 MOZ_ASSERT(runnable);
2564 NS_DispatchToMainThread(runnable);
2565 }
2566 } else {
2567 NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
2568 }
2569 mQueuedMessages.Clear();
2570 mQueuedMessages.Compact();
2571 // We never use it again... We could even allocate the array in the odd
2572 // cases we need it.
2573 }
2575 uint32_t
2576 DataChannel::GetBufferedAmount()
2577 {
2578 uint32_t buffered = 0;
2579 for (uint32_t i = 0; i < mBufferedData.Length(); ++i) {
2580 buffered += mBufferedData[i]->mLength;
2581 }
2582 return buffered;
2583 }
2585 // Called with mLock locked!
2586 void
2587 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
2588 {
2589 if (!mReady &&
2590 (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
2591 mQueuedMessages.AppendElement(aMessage);
2592 } else {
2593 NS_DispatchToMainThread(aMessage);
2594 }
2595 }
2597 } // namespace mozilla