|
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
|
2 /* vim: set ts=2 et sw=2 tw=80: */ |
|
3 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file, |
|
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
6 |
|
7 #include <stdio.h> |
|
8 #include <stdlib.h> |
|
9 #if !defined(__Userspace_os_Windows) |
|
10 #include <arpa/inet.h> |
|
11 #endif |
|
12 // usrsctp.h expects to have errno definitions prior to its inclusion. |
|
13 #include <errno.h> |
|
14 |
|
15 #define SCTP_DEBUG 1 |
|
16 #define SCTP_STDINT_INCLUDE <stdint.h> |
|
17 |
|
18 #ifdef _MSC_VER |
|
19 // Disable "warning C4200: nonstandard extension used : zero-sized array in |
|
20 // struct/union" |
|
21 // ...which the third-party file usrsctp.h runs afoul of. |
|
22 #pragma warning(push) |
|
23 #pragma warning(disable:4200) |
|
24 #endif |
|
25 |
|
26 #include "usrsctp.h" |
|
27 |
|
28 #ifdef _MSC_VER |
|
29 #pragma warning(pop) |
|
30 #endif |
|
31 |
|
32 #include "DataChannelLog.h" |
|
33 |
|
34 #include "nsServiceManagerUtils.h" |
|
35 #include "nsIObserverService.h" |
|
36 #include "nsIObserver.h" |
|
37 #include "mozilla/Services.h" |
|
38 #include "nsProxyRelease.h" |
|
39 #include "nsThread.h" |
|
40 #include "nsThreadUtils.h" |
|
41 #include "nsAutoPtr.h" |
|
42 #include "nsNetUtil.h" |
|
43 #include "mozilla/StaticPtr.h" |
|
44 #ifdef MOZ_PEERCONNECTION |
|
45 #include "mtransport/runnable_utils.h" |
|
46 #endif |
|
47 |
|
48 #define DATACHANNEL_LOG(args) LOG(args) |
|
49 #include "DataChannel.h" |
|
50 #include "DataChannelProtocol.h" |
|
51 |
|
52 #ifdef PR_LOGGING |
|
53 PRLogModuleInfo* |
|
54 GetDataChannelLog() |
|
55 { |
|
56 static PRLogModuleInfo* sLog; |
|
57 if (!sLog) |
|
58 sLog = PR_NewLogModule("DataChannel"); |
|
59 return sLog; |
|
60 } |
|
61 |
|
62 PRLogModuleInfo* |
|
63 GetSCTPLog() |
|
64 { |
|
65 static PRLogModuleInfo* sLog; |
|
66 if (!sLog) |
|
67 sLog = PR_NewLogModule("SCTP"); |
|
68 return sLog; |
|
69 } |
|
70 #endif |
|
71 |
|
72 // Let us turn on and off important assertions in non-debug builds |
|
73 #ifdef DEBUG |
|
74 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x)) |
|
75 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS) |
|
76 #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0) |
|
77 #endif |
|
78 |
|
79 static bool sctp_initialized; |
|
80 |
|
81 namespace mozilla { |
|
82 |
|
83 class DataChannelShutdown; |
|
84 StaticRefPtr<DataChannelShutdown> gDataChannelShutdown; |
|
85 |
|
86 class DataChannelShutdown : public nsIObserver |
|
87 { |
|
88 public: |
|
89 // This needs to be tied to some form object that is guaranteed to be |
|
90 // around (singleton likely) unless we want to shutdown sctp whenever |
|
91 // we're not using it (and in which case we'd keep a refcnt'd object |
|
92 // ref'd by each DataChannelConnection to release the SCTP usrlib via |
|
93 // sctp_finish) |
|
94 |
|
95 NS_DECL_ISUPPORTS |
|
96 |
|
97 DataChannelShutdown() {} |
|
98 |
|
99 void Init() |
|
100 { |
|
101 nsCOMPtr<nsIObserverService> observerService = |
|
102 mozilla::services::GetObserverService(); |
|
103 if (!observerService) |
|
104 return; |
|
105 |
|
106 nsresult rv = observerService->AddObserver(this, |
|
107 "profile-change-net-teardown", |
|
108 false); |
|
109 MOZ_ASSERT(rv == NS_OK); |
|
110 (void) rv; |
|
111 } |
|
112 |
|
113 virtual ~DataChannelShutdown() |
|
114 { |
|
115 nsCOMPtr<nsIObserverService> observerService = |
|
116 mozilla::services::GetObserverService(); |
|
117 if (observerService) |
|
118 observerService->RemoveObserver(this, "profile-change-net-teardown"); |
|
119 } |
|
120 |
|
121 NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic, |
|
122 const char16_t* aData) { |
|
123 if (strcmp(aTopic, "profile-change-net-teardown") == 0) { |
|
124 LOG(("Shutting down SCTP")); |
|
125 if (sctp_initialized) { |
|
126 usrsctp_finish(); |
|
127 sctp_initialized = false; |
|
128 } |
|
129 nsCOMPtr<nsIObserverService> observerService = |
|
130 mozilla::services::GetObserverService(); |
|
131 if (!observerService) |
|
132 return NS_ERROR_FAILURE; |
|
133 |
|
134 nsresult rv = observerService->RemoveObserver(this, |
|
135 "profile-change-net-teardown"); |
|
136 MOZ_ASSERT(rv == NS_OK); |
|
137 (void) rv; |
|
138 |
|
139 nsRefPtr<DataChannelShutdown> kungFuDeathGrip(this); |
|
140 gDataChannelShutdown = nullptr; |
|
141 } |
|
142 return NS_OK; |
|
143 } |
|
144 }; |
|
145 |
|
146 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver); |
|
147 |
|
148 |
|
149 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data, |
|
150 uint32_t length) : mLength(length) |
|
151 { |
|
152 mSpa = new sctp_sendv_spa; |
|
153 *mSpa = spa; |
|
154 char *tmp = new char[length]; // infallible malloc! |
|
155 memcpy(tmp, data, length); |
|
156 mData = tmp; |
|
157 } |
|
158 |
|
159 BufferedMsg::~BufferedMsg() |
|
160 { |
|
161 delete mSpa; |
|
162 delete mData; |
|
163 } |
|
164 |
|
165 static int |
|
166 receive_cb(struct socket* sock, union sctp_sockstore addr, |
|
167 void *data, size_t datalen, |
|
168 struct sctp_rcvinfo rcv, int flags, void *ulp_info) |
|
169 { |
|
170 DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info); |
|
171 return connection->ReceiveCallback(sock, data, datalen, rcv, flags); |
|
172 } |
|
173 |
|
174 #ifdef PR_LOGGING |
|
175 static void |
|
176 debug_printf(const char *format, ...) |
|
177 { |
|
178 va_list ap; |
|
179 char buffer[1024]; |
|
180 |
|
181 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { |
|
182 va_start(ap, format); |
|
183 #ifdef _WIN32 |
|
184 if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { |
|
185 #else |
|
186 if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) { |
|
187 #endif |
|
188 PR_LogPrint("%s", buffer); |
|
189 } |
|
190 va_end(ap); |
|
191 } |
|
192 } |
|
193 #endif |
|
194 |
|
195 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) : |
|
196 mLock("netwerk::sctp::DataChannelConnection") |
|
197 { |
|
198 mState = CLOSED; |
|
199 mSocket = nullptr; |
|
200 mMasterSocket = nullptr; |
|
201 mListener = listener->asWeakPtr(); |
|
202 mLocalPort = 0; |
|
203 mRemotePort = 0; |
|
204 mDeferTimeout = 10; |
|
205 mTimerRunning = false; |
|
206 LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get())); |
|
207 mInternalIOThread = nullptr; |
|
208 } |
|
209 |
|
210 DataChannelConnection::~DataChannelConnection() |
|
211 { |
|
212 LOG(("Deleting DataChannelConnection %p", (void *) this)); |
|
213 // This may die on the MainThread, or on the STS thread |
|
214 ASSERT_WEBRTC(mState == CLOSED); |
|
215 MOZ_ASSERT(!mMasterSocket); |
|
216 MOZ_ASSERT(mPending.GetSize() == 0); |
|
217 |
|
218 // Already disconnected from sigslot/mTransportFlow |
|
219 // TransportFlows must be released from the STS thread |
|
220 if (!IsSTSThread()) { |
|
221 ASSERT_WEBRTC(NS_IsMainThread()); |
|
222 if (mTransportFlow) { |
|
223 ASSERT_WEBRTC(mSTS); |
|
224 NS_ProxyRelease(mSTS, mTransportFlow); |
|
225 } |
|
226 |
|
227 if (mInternalIOThread) { |
|
228 // Avoid spinning the event thread from here (which if we're mainthread |
|
229 // is in the event loop already) |
|
230 NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread), |
|
231 &nsIThread::Shutdown), |
|
232 NS_DISPATCH_NORMAL); |
|
233 } |
|
234 } else { |
|
235 // on STS, safe to call shutdown |
|
236 if (mInternalIOThread) { |
|
237 mInternalIOThread->Shutdown(); |
|
238 } |
|
239 } |
|
240 } |
|
241 |
|
242 void |
|
243 DataChannelConnection::Destroy() |
|
244 { |
|
245 // Though it's probably ok to do this and close the sockets; |
|
246 // if we really want it to do true clean shutdowns it can |
|
247 // create a dependant Internal object that would remain around |
|
248 // until the network shut down the association or timed out. |
|
249 LOG(("Destroying DataChannelConnection %p", (void *) this)); |
|
250 ASSERT_WEBRTC(NS_IsMainThread()); |
|
251 CloseAll(); |
|
252 |
|
253 MutexAutoLock lock(mLock); |
|
254 // If we had a pending reset, we aren't waiting for it - clear the list so |
|
255 // we can deregister this DataChannelConnection without leaking. |
|
256 ClearResets(); |
|
257 |
|
258 MOZ_ASSERT(mSTS); |
|
259 ASSERT_WEBRTC(NS_IsMainThread()); |
|
260 // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, |
|
261 // the usrsctp_close() calls can move back here (and just proxy the |
|
262 // disconnect_all()) |
|
263 RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this), |
|
264 &DataChannelConnection::DestroyOnSTS, |
|
265 mSocket, mMasterSocket), |
|
266 NS_DISPATCH_NORMAL); |
|
267 |
|
268 // These will be released on STS |
|
269 mSocket = nullptr; |
|
270 mMasterSocket = nullptr; // also a flag that we've Destroyed this connection |
|
271 |
|
272 // Must do this in Destroy() since we may then delete this object |
|
273 if (mUsingDtls) { |
|
274 usrsctp_deregister_address(static_cast<void *>(this)); |
|
275 LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this))); |
|
276 } |
|
277 |
|
278 // We can't get any more new callbacks from the SCTP library |
|
279 // All existing callbacks have refs to DataChannelConnection |
|
280 |
|
281 // nsDOMDataChannel objects have refs to DataChannels that have refs to us |
|
282 } |
|
283 |
|
284 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket, |
|
285 struct socket *aSocket) |
|
286 { |
|
287 if (aSocket && aSocket != aMasterSocket) |
|
288 usrsctp_close(aSocket); |
|
289 if (aMasterSocket) |
|
290 usrsctp_close(aMasterSocket); |
|
291 |
|
292 disconnect_all(); |
|
293 } |
|
294 |
|
295 NS_IMPL_ISUPPORTS(DataChannelConnection, |
|
296 nsITimerCallback) |
|
297 |
|
298 bool |
|
299 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls) |
|
300 { |
|
301 struct sctp_initmsg initmsg; |
|
302 struct sctp_udpencaps encaps; |
|
303 struct sctp_assoc_value av; |
|
304 struct sctp_event event; |
|
305 socklen_t len; |
|
306 |
|
307 uint16_t event_types[] = {SCTP_ASSOC_CHANGE, |
|
308 SCTP_PEER_ADDR_CHANGE, |
|
309 SCTP_REMOTE_ERROR, |
|
310 SCTP_SHUTDOWN_EVENT, |
|
311 SCTP_ADAPTATION_INDICATION, |
|
312 SCTP_SEND_FAILED_EVENT, |
|
313 SCTP_STREAM_RESET_EVENT, |
|
314 SCTP_STREAM_CHANGE_EVENT}; |
|
315 { |
|
316 ASSERT_WEBRTC(NS_IsMainThread()); |
|
317 |
|
318 // MutexAutoLock lock(mLock); Not needed since we're on mainthread always |
|
319 if (!sctp_initialized) { |
|
320 if (aUsingDtls) { |
|
321 LOG(("sctp_init(DTLS)")); |
|
322 #ifdef MOZ_PEERCONNECTION |
|
323 usrsctp_init(0, |
|
324 DataChannelConnection::SctpDtlsOutput, |
|
325 #ifdef PR_LOGGING |
|
326 debug_printf |
|
327 #else |
|
328 nullptr |
|
329 #endif |
|
330 ); |
|
331 #else |
|
332 NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport"); |
|
333 #endif |
|
334 } else { |
|
335 LOG(("sctp_init(%u)", aPort)); |
|
336 usrsctp_init(aPort, |
|
337 nullptr, |
|
338 #ifdef PR_LOGGING |
|
339 debug_printf |
|
340 #else |
|
341 nullptr |
|
342 #endif |
|
343 ); |
|
344 } |
|
345 |
|
346 #ifdef PR_LOGGING |
|
347 // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs |
|
348 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { |
|
349 usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); |
|
350 } |
|
351 #endif |
|
352 usrsctp_sysctl_set_sctp_blackhole(2); |
|
353 // ECN is currently not supported by the Firefox code |
|
354 usrsctp_sysctl_set_sctp_ecn_enable(0); |
|
355 sctp_initialized = true; |
|
356 |
|
357 gDataChannelShutdown = new DataChannelShutdown(); |
|
358 gDataChannelShutdown->Init(); |
|
359 } |
|
360 } |
|
361 |
|
362 // XXX FIX! make this a global we get once |
|
363 // Find the STS thread |
|
364 nsresult rv; |
|
365 mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); |
|
366 MOZ_ASSERT(NS_SUCCEEDED(rv)); |
|
367 |
|
368 // Open sctp with a callback |
|
369 if ((mMasterSocket = usrsctp_socket( |
|
370 aUsingDtls ? AF_CONN : AF_INET, |
|
371 SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) { |
|
372 return false; |
|
373 } |
|
374 |
|
375 // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking |
|
376 // in associations for normal IO |
|
377 if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) { |
|
378 LOG(("Couldn't set non_blocking on SCTP socket")); |
|
379 // We can't handle connect() safely if it will block, not that this will |
|
380 // even happen. |
|
381 goto error_cleanup; |
|
382 } |
|
383 |
|
384 // Make sure when we close the socket, make sure it doesn't call us back again! |
|
385 // This would cause it try to use an invalid DataChannelConnection pointer |
|
386 struct linger l; |
|
387 l.l_onoff = 1; |
|
388 l.l_linger = 0; |
|
389 if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, |
|
390 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { |
|
391 LOG(("Couldn't set SO_LINGER on SCTP socket")); |
|
392 // unsafe to allow it to continue if this fails |
|
393 goto error_cleanup; |
|
394 } |
|
395 |
|
396 // XXX Consider disabling this when we add proper SDP negotiation. |
|
397 // We may want to leave enabled for supporting 'cloning' of SDP offers, which |
|
398 // implies re-use of the same pseudo-port number, or forcing a renegotiation. |
|
399 { |
|
400 uint32_t on = 1; |
|
401 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, |
|
402 (const void *)&on, (socklen_t)sizeof(on)) < 0) { |
|
403 LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); |
|
404 } |
|
405 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY, |
|
406 (const void *)&on, (socklen_t)sizeof(on)) < 0) { |
|
407 LOG(("Couldn't set SCTP_NODELAY on SCTP socket")); |
|
408 } |
|
409 } |
|
410 |
|
411 if (!aUsingDtls) { |
|
412 memset(&encaps, 0, sizeof(encaps)); |
|
413 encaps.sue_address.ss_family = AF_INET; |
|
414 encaps.sue_port = htons(aPort); |
|
415 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT, |
|
416 (const void*)&encaps, |
|
417 (socklen_t)sizeof(struct sctp_udpencaps)) < 0) { |
|
418 LOG(("*** failed encaps errno %d", errno)); |
|
419 goto error_cleanup; |
|
420 } |
|
421 LOG(("SCTP encapsulation local port %d", aPort)); |
|
422 } |
|
423 |
|
424 av.assoc_id = SCTP_ALL_ASSOC; |
|
425 av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; |
|
426 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, |
|
427 (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { |
|
428 LOG(("*** failed enable stream reset errno %d", errno)); |
|
429 goto error_cleanup; |
|
430 } |
|
431 |
|
432 /* Enable the events of interest. */ |
|
433 memset(&event, 0, sizeof(event)); |
|
434 event.se_assoc_id = SCTP_ALL_ASSOC; |
|
435 event.se_on = 1; |
|
436 for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) { |
|
437 event.se_type = event_types[i]; |
|
438 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) { |
|
439 LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno)); |
|
440 goto error_cleanup; |
|
441 } |
|
442 } |
|
443 |
|
444 // Update number of streams |
|
445 mStreams.AppendElements(aNumStreams); |
|
446 for (uint32_t i = 0; i < aNumStreams; ++i) { |
|
447 mStreams[i] = nullptr; |
|
448 } |
|
449 memset(&initmsg, 0, sizeof(initmsg)); |
|
450 len = sizeof(initmsg); |
|
451 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { |
|
452 LOG(("*** failed getsockopt SCTP_INITMSG")); |
|
453 goto error_cleanup; |
|
454 } |
|
455 LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, |
|
456 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); |
|
457 initmsg.sinit_num_ostreams = aNumStreams; |
|
458 initmsg.sinit_max_instreams = MAX_NUM_STREAMS; |
|
459 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, |
|
460 (socklen_t)sizeof(initmsg)) < 0) { |
|
461 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); |
|
462 goto error_cleanup; |
|
463 } |
|
464 |
|
465 mSocket = nullptr; |
|
466 if (aUsingDtls) { |
|
467 mUsingDtls = true; |
|
468 usrsctp_register_address(static_cast<void *>(this)); |
|
469 LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this))); |
|
470 } else { |
|
471 mUsingDtls = false; |
|
472 } |
|
473 return true; |
|
474 |
|
475 error_cleanup: |
|
476 usrsctp_close(mMasterSocket); |
|
477 mMasterSocket = nullptr; |
|
478 mUsingDtls = false; |
|
479 return false; |
|
480 } |
|
481 |
|
482 void |
|
483 DataChannelConnection::StartDefer() |
|
484 { |
|
485 nsresult rv; |
|
486 if (!NS_IsMainThread()) { |
|
487 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
488 DataChannelOnMessageAvailable::START_DEFER, |
|
489 this, (DataChannel *) nullptr)); |
|
490 return; |
|
491 } |
|
492 |
|
493 ASSERT_WEBRTC(NS_IsMainThread()); |
|
494 if (!mDeferredTimer) { |
|
495 mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); |
|
496 MOZ_ASSERT(mDeferredTimer); |
|
497 } |
|
498 |
|
499 if (!mTimerRunning) { |
|
500 rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, |
|
501 nsITimer::TYPE_ONE_SHOT); |
|
502 NS_ENSURE_TRUE_VOID(rv == NS_OK); |
|
503 |
|
504 mTimerRunning = true; |
|
505 } |
|
506 } |
|
507 |
|
508 // nsITimerCallback |
|
509 |
|
510 NS_IMETHODIMP |
|
511 DataChannelConnection::Notify(nsITimer *timer) |
|
512 { |
|
513 ASSERT_WEBRTC(NS_IsMainThread()); |
|
514 LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout)); |
|
515 |
|
516 if (timer == mDeferredTimer) { |
|
517 if (SendDeferredMessages()) { |
|
518 // Still blocked |
|
519 // we don't need a lock, since this must be main thread... |
|
520 nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, |
|
521 nsITimer::TYPE_ONE_SHOT); |
|
522 if (NS_FAILED(rv)) { |
|
523 LOG(("%s: cannot initialize open timer", __FUNCTION__)); |
|
524 // XXX and do....? |
|
525 return rv; |
|
526 } |
|
527 mTimerRunning = true; |
|
528 } else { |
|
529 LOG(("Turned off deferred send timer")); |
|
530 mTimerRunning = false; |
|
531 } |
|
532 } |
|
533 return NS_OK; |
|
534 } |
|
535 |
|
536 #ifdef MOZ_PEERCONNECTION |
|
537 void |
|
538 DataChannelConnection::SetEvenOdd() |
|
539 { |
|
540 ASSERT_WEBRTC(IsSTSThread()); |
|
541 |
|
542 TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>( |
|
543 mTransportFlow->GetLayer(TransportLayerDtls::ID())); |
|
544 MOZ_ASSERT(dtls); // DTLS is mandatory |
|
545 mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT); |
|
546 } |
|
547 |
|
548 bool |
|
549 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport) |
|
550 { |
|
551 LOG(("Connect DTLS local %u, remote %u", localport, remoteport)); |
|
552 |
|
553 NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!"); |
|
554 NS_ENSURE_TRUE(aFlow, false); |
|
555 |
|
556 mTransportFlow = aFlow; |
|
557 mLocalPort = localport; |
|
558 mRemotePort = remoteport; |
|
559 mState = CONNECTING; |
|
560 |
|
561 RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr<DataChannelConnection>(this), |
|
562 &DataChannelConnection::SetSignals), |
|
563 NS_DISPATCH_NORMAL); |
|
564 return true; |
|
565 } |
|
566 |
|
567 void |
|
568 DataChannelConnection::SetSignals() |
|
569 { |
|
570 ASSERT_WEBRTC(IsSTSThread()); |
|
571 ASSERT_WEBRTC(mTransportFlow); |
|
572 LOG(("Setting transport signals, state: %d", mTransportFlow->state())); |
|
573 mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput); |
|
574 // SignalStateChange() doesn't call you with the initial state |
|
575 mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect); |
|
576 CompleteConnect(mTransportFlow, mTransportFlow->state()); |
|
577 } |
|
578 |
|
579 void |
|
580 DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state) |
|
581 { |
|
582 LOG(("Data transport state: %d", state)); |
|
583 MutexAutoLock lock(mLock); |
|
584 ASSERT_WEBRTC(IsSTSThread()); |
|
585 // We should abort connection on TS_ERROR. |
|
586 // Note however that the association will also fail (perhaps with a delay) and |
|
587 // notify us in that way |
|
588 if (state != TransportLayer::TS_OPEN || !mMasterSocket) |
|
589 return; |
|
590 |
|
591 struct sockaddr_conn addr; |
|
592 memset(&addr, 0, sizeof(addr)); |
|
593 addr.sconn_family = AF_CONN; |
|
594 #if defined(__Userspace_os_Darwin) |
|
595 addr.sconn_len = sizeof(addr); |
|
596 #endif |
|
597 addr.sconn_port = htons(mLocalPort); |
|
598 addr.sconn_addr = static_cast<void *>(this); |
|
599 |
|
600 LOG(("Calling usrsctp_bind")); |
|
601 int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), |
|
602 sizeof(addr)); |
|
603 if (r < 0) { |
|
604 LOG(("usrsctp_bind failed: %d", r)); |
|
605 } else { |
|
606 // This is the remote addr |
|
607 addr.sconn_port = htons(mRemotePort); |
|
608 LOG(("Calling usrsctp_connect")); |
|
609 r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), |
|
610 sizeof(addr)); |
|
611 if (r < 0) { |
|
612 if (errno == EINPROGRESS) { |
|
613 // non-blocking |
|
614 return; |
|
615 } else { |
|
616 LOG(("usrsctp_connect failed: %d", errno)); |
|
617 mState = CLOSED; |
|
618 } |
|
619 } else { |
|
620 // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that |
|
621 // This also avoids issues with calling TransportFlow stuff on Mainthread |
|
622 return; |
|
623 } |
|
624 } |
|
625 // Note: currently this doesn't actually notify the application |
|
626 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
627 DataChannelOnMessageAvailable::ON_CONNECTION, |
|
628 this, false)); |
|
629 return; |
|
630 } |
|
631 |
|
632 // Process any pending Opens |
|
633 void |
|
634 DataChannelConnection::ProcessQueuedOpens() |
|
635 { |
|
636 // The nsDeque holds channels with an AddRef applied. Another reference |
|
637 // (may) be held by the DOMDataChannel, unless it's been GC'd. No other |
|
638 // references should exist. |
|
639 |
|
640 // Can't copy nsDeque's. Move into temp array since any that fail will |
|
641 // go back to mPending |
|
642 nsDeque temp; |
|
643 DataChannel *temp_channel; // really already_AddRefed<> |
|
644 while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) { |
|
645 temp.Push(static_cast<void *>(temp_channel)); |
|
646 } |
|
647 |
|
648 nsRefPtr<DataChannel> channel; |
|
649 // All these entries have an AddRef(); make that explicit now via the dont_AddRef() |
|
650 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) { |
|
651 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { |
|
652 LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream)); |
|
653 channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; |
|
654 // OpenFinish returns a reference itself, so we need to take it can Release it |
|
655 channel = OpenFinish(channel.forget()); // may reset the flag and re-push |
|
656 } else { |
|
657 NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?"); |
|
658 } |
|
659 } |
|
660 |
|
661 } |
|
662 void |
|
663 DataChannelConnection::SctpDtlsInput(TransportFlow *flow, |
|
664 const unsigned char *data, size_t len) |
|
665 { |
|
666 #ifdef PR_LOGGING |
|
667 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { |
|
668 char *buf; |
|
669 |
|
670 if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) { |
|
671 PR_LogPrint("%s", buf); |
|
672 usrsctp_freedumpbuffer(buf); |
|
673 } |
|
674 } |
|
675 #endif |
|
676 // Pass the data to SCTP |
|
677 usrsctp_conninput(static_cast<void *>(this), data, len, 0); |
|
678 } |
|
679 |
|
680 int |
|
681 DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release) |
|
682 { |
|
683 //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len)); |
|
684 int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0; |
|
685 if (release) |
|
686 delete data; |
|
687 return res; |
|
688 } |
|
689 |
|
690 /* static */ |
|
691 int |
|
692 DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length, |
|
693 uint8_t tos, uint8_t set_df) |
|
694 { |
|
695 DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr); |
|
696 int res; |
|
697 |
|
698 #ifdef PR_LOGGING |
|
699 if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { |
|
700 char *buf; |
|
701 |
|
702 if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) { |
|
703 PR_LogPrint("%s", buf); |
|
704 usrsctp_freedumpbuffer(buf); |
|
705 } |
|
706 } |
|
707 #endif |
|
708 // We're async proxying even if on the STSThread because this is called |
|
709 // with internal SCTP locks held in some cases (such as in usrsctp_connect()). |
|
710 // SCTP has an option for Apple, on IP connections only, to release at least |
|
711 // one of the locks before calling a packet output routine; with changes to |
|
712 // the underlying SCTP stack this might remove the need to use an async proxy. |
|
713 if (0 /*peer->IsSTSThread()*/) { |
|
714 res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false); |
|
715 } else { |
|
716 unsigned char *data = new unsigned char[length]; |
|
717 memcpy(data, buffer, length); |
|
718 res = -1; |
|
719 // XXX It might be worthwhile to add an assertion against the thread |
|
720 // somehow getting into the DataChannel/SCTP code again, as |
|
721 // DISPATCH_SYNC is not fully blocking. This may be tricky, as it |
|
722 // needs to be a per-thread check, not a global. |
|
723 peer->mSTS->Dispatch(WrapRunnable( |
|
724 nsRefPtr<DataChannelConnection>(peer), |
|
725 &DataChannelConnection::SendPacket, data, length, true), |
|
726 NS_DISPATCH_NORMAL); |
|
727 res = 0; // cheat! Packets can always be dropped later anyways |
|
728 } |
|
729 return res; |
|
730 } |
|
731 #endif |
|
732 |
|
733 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT |
|
734 // listen for incoming associations |
|
735 // Blocks! - Don't call this from main thread! |
|
736 |
|
737 #error This code will not work as-is since SetEvenOdd() runs on Mainthread |
|
738 |
|
739 bool |
|
740 DataChannelConnection::Listen(unsigned short port) |
|
741 { |
|
742 struct sockaddr_in addr; |
|
743 socklen_t addr_len; |
|
744 |
|
745 NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); |
|
746 |
|
747 /* Acting as the 'server' */ |
|
748 memset((void *)&addr, 0, sizeof(addr)); |
|
749 #ifdef HAVE_SIN_LEN |
|
750 addr.sin_len = sizeof(struct sockaddr_in); |
|
751 #endif |
|
752 addr.sin_family = AF_INET; |
|
753 addr.sin_port = htons(port); |
|
754 addr.sin_addr.s_addr = htonl(INADDR_ANY); |
|
755 LOG(("Waiting for connections on port %u", ntohs(addr.sin_port))); |
|
756 mState = CONNECTING; |
|
757 if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) { |
|
758 LOG(("***Failed userspace_bind")); |
|
759 return false; |
|
760 } |
|
761 if (usrsctp_listen(mMasterSocket, 1) < 0) { |
|
762 LOG(("***Failed userspace_listen")); |
|
763 return false; |
|
764 } |
|
765 |
|
766 LOG(("Accepting connection")); |
|
767 addr_len = 0; |
|
768 if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) { |
|
769 LOG(("***Failed accept")); |
|
770 return false; |
|
771 } |
|
772 mState = OPEN; |
|
773 |
|
774 struct linger l; |
|
775 l.l_onoff = 1; |
|
776 l.l_linger = 0; |
|
777 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, |
|
778 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { |
|
779 LOG(("Couldn't set SO_LINGER on SCTP socket")); |
|
780 } |
|
781 |
|
782 SetEvenOdd(); |
|
783 |
|
784 // Notify Connection open |
|
785 // XXX We need to make sure connection sticks around until the message is delivered |
|
786 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); |
|
787 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
788 DataChannelOnMessageAvailable::ON_CONNECTION, |
|
789 this, (DataChannel *) nullptr)); |
|
790 return true; |
|
791 } |
|
792 |
|
793 // Blocks! - Don't call this from main thread! |
|
794 bool |
|
795 DataChannelConnection::Connect(const char *addr, unsigned short port) |
|
796 { |
|
797 struct sockaddr_in addr4; |
|
798 struct sockaddr_in6 addr6; |
|
799 |
|
800 NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); |
|
801 |
|
802 /* Acting as the connector */ |
|
803 LOG(("Connecting to %s, port %u", addr, port)); |
|
804 memset((void *)&addr4, 0, sizeof(struct sockaddr_in)); |
|
805 memset((void *)&addr6, 0, sizeof(struct sockaddr_in6)); |
|
806 #ifdef HAVE_SIN_LEN |
|
807 addr4.sin_len = sizeof(struct sockaddr_in); |
|
808 #endif |
|
809 #ifdef HAVE_SIN6_LEN |
|
810 addr6.sin6_len = sizeof(struct sockaddr_in6); |
|
811 #endif |
|
812 addr4.sin_family = AF_INET; |
|
813 addr6.sin6_family = AF_INET6; |
|
814 addr4.sin_port = htons(port); |
|
815 addr6.sin6_port = htons(port); |
|
816 mState = CONNECTING; |
|
817 |
|
818 #if !defined(__Userspace_os_Windows) |
|
819 if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { |
|
820 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { |
|
821 LOG(("*** Failed userspace_connect")); |
|
822 return false; |
|
823 } |
|
824 } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { |
|
825 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { |
|
826 LOG(("*** Failed userspace_connect")); |
|
827 return false; |
|
828 } |
|
829 } else { |
|
830 LOG(("*** Illegal destination address.")); |
|
831 } |
|
832 #else |
|
833 { |
|
834 struct sockaddr_storage ss; |
|
835 int sslen = sizeof(ss); |
|
836 |
|
837 if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) { |
|
838 addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr; |
|
839 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) { |
|
840 LOG(("*** Failed userspace_connect")); |
|
841 return false; |
|
842 } |
|
843 } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) { |
|
844 addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr; |
|
845 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) { |
|
846 LOG(("*** Failed userspace_connect")); |
|
847 return false; |
|
848 } |
|
849 } else { |
|
850 LOG(("*** Illegal destination address.")); |
|
851 } |
|
852 } |
|
853 #endif |
|
854 |
|
855 mSocket = mMasterSocket; |
|
856 |
|
857 LOG(("connect() succeeded! Entering connected mode")); |
|
858 mState = OPEN; |
|
859 |
|
860 SetEvenOdd(); |
|
861 |
|
862 // Notify Connection open |
|
863 // XXX We need to make sure connection sticks around until the message is delivered |
|
864 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); |
|
865 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
866 DataChannelOnMessageAvailable::ON_CONNECTION, |
|
867 this, (DataChannel *) nullptr)); |
|
868 return true; |
|
869 } |
|
870 #endif |
|
871 |
|
872 DataChannel * |
|
873 DataChannelConnection::FindChannelByStream(uint16_t stream) |
|
874 { |
|
875 return mStreams.SafeElementAt(stream); |
|
876 } |
|
877 |
|
878 uint16_t |
|
879 DataChannelConnection::FindFreeStream() |
|
880 { |
|
881 uint32_t i, j, limit; |
|
882 |
|
883 limit = mStreams.Length(); |
|
884 if (limit > MAX_NUM_STREAMS) |
|
885 limit = MAX_NUM_STREAMS; |
|
886 |
|
887 for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) { |
|
888 if (!mStreams[i]) { |
|
889 // Verify it's not still in the process of closing |
|
890 for (j = 0; j < mStreamsResetting.Length(); ++j) { |
|
891 if (mStreamsResetting[j] == i) { |
|
892 break; |
|
893 } |
|
894 } |
|
895 if (j == mStreamsResetting.Length()) |
|
896 break; |
|
897 } |
|
898 } |
|
899 if (i >= limit) { |
|
900 return INVALID_STREAM; |
|
901 } |
|
902 return i; |
|
903 } |
|
904 |
|
905 bool |
|
906 DataChannelConnection::RequestMoreStreams(int32_t aNeeded) |
|
907 { |
|
908 struct sctp_status status; |
|
909 struct sctp_add_streams sas; |
|
910 uint32_t outStreamsNeeded; |
|
911 socklen_t len; |
|
912 |
|
913 if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) { |
|
914 aNeeded = MAX_NUM_STREAMS - mStreams.Length(); |
|
915 } |
|
916 if (aNeeded <= 0) { |
|
917 return false; |
|
918 } |
|
919 |
|
920 len = (socklen_t)sizeof(struct sctp_status); |
|
921 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) { |
|
922 LOG(("***failed: getsockopt SCTP_STATUS")); |
|
923 return false; |
|
924 } |
|
925 outStreamsNeeded = aNeeded; // number to add |
|
926 |
|
927 // Note: if multiple channel opens happen when we don't have enough space, |
|
928 // we'll call RequestMoreStreams() multiple times |
|
929 memset(&sas, 0, sizeof(sas)); |
|
930 sas.sas_instrms = 0; |
|
931 sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ |
|
932 // Doesn't block, we get an event when it succeeds or fails |
|
933 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, |
|
934 (socklen_t) sizeof(struct sctp_add_streams)) < 0) { |
|
935 if (errno == EALREADY) { |
|
936 LOG(("Already have %u output streams", outStreamsNeeded)); |
|
937 return true; |
|
938 } |
|
939 |
|
940 LOG(("***failed: setsockopt ADD errno=%d", errno)); |
|
941 return false; |
|
942 } |
|
943 LOG(("Requested %u more streams", outStreamsNeeded)); |
|
944 // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the |
|
945 // values are larger than mStreams.Length() |
|
946 return true; |
|
947 } |
|
948 |
|
949 int32_t |
|
950 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream) |
|
951 { |
|
952 struct sctp_sndinfo sndinfo; |
|
953 |
|
954 // Note: Main-thread IO, but doesn't block |
|
955 memset(&sndinfo, 0, sizeof(struct sctp_sndinfo)); |
|
956 sndinfo.snd_sid = stream; |
|
957 sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); |
|
958 if (usrsctp_sendv(mSocket, msg, len, nullptr, 0, |
|
959 &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), |
|
960 SCTP_SENDV_SNDINFO, 0) < 0) { |
|
961 //LOG(("***failed: sctp_sendv")); don't log because errno is a return! |
|
962 return (0); |
|
963 } |
|
964 return (1); |
|
965 } |
|
966 |
|
967 int32_t |
|
968 DataChannelConnection::SendOpenAckMessage(uint16_t stream) |
|
969 { |
|
970 struct rtcweb_datachannel_ack ack; |
|
971 |
|
972 memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack)); |
|
973 ack.msg_type = DATA_CHANNEL_ACK; |
|
974 |
|
975 return SendControlMessage(&ack, sizeof(ack), stream); |
|
976 } |
|
977 |
|
978 int32_t |
|
979 DataChannelConnection::SendOpenRequestMessage(const nsACString& label, |
|
980 const nsACString& protocol, |
|
981 uint16_t stream, bool unordered, |
|
982 uint16_t prPolicy, uint32_t prValue) |
|
983 { |
|
984 int label_len = label.Length(); // not including nul |
|
985 int proto_len = protocol.Length(); // not including nul |
|
986 struct rtcweb_datachannel_open_request *req = |
|
987 (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len); |
|
988 // careful - request includes 1 char label |
|
989 |
|
990 memset(req, 0, sizeof(struct rtcweb_datachannel_open_request)); |
|
991 req->msg_type = DATA_CHANNEL_OPEN_REQUEST; |
|
992 switch (prPolicy) { |
|
993 case SCTP_PR_SCTP_NONE: |
|
994 req->channel_type = DATA_CHANNEL_RELIABLE; |
|
995 break; |
|
996 case SCTP_PR_SCTP_TTL: |
|
997 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; |
|
998 break; |
|
999 case SCTP_PR_SCTP_RTX: |
|
1000 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; |
|
1001 break; |
|
1002 default: |
|
1003 // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno! |
|
1004 moz_free(req); |
|
1005 return (0); |
|
1006 } |
|
1007 if (unordered) { |
|
1008 // Per the current types, all differ by 0x80 between ordered and unordered |
|
1009 req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future |
|
1010 } |
|
1011 |
|
1012 req->reliability_param = htonl(prValue); |
|
1013 req->priority = htons(0); /* XXX: add support */ |
|
1014 req->label_length = htons(label_len); |
|
1015 req->protocol_length = htons(proto_len); |
|
1016 memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); |
|
1017 memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); |
|
1018 |
|
1019 // sizeof(*req) already includes +1 byte for label, need nul for both strings |
|
1020 int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream); |
|
1021 |
|
1022 moz_free(req); |
|
1023 return result; |
|
1024 } |
|
1025 |
|
1026 // XXX This should use a separate thread (outbound queue) which should |
|
1027 // select() to know when to *try* to send data to the socket again. |
|
1028 // Alternatively, it can use a timeout, but that's guaranteed to be wrong |
|
1029 // (just not sure in what direction). We could re-implement NSPR's |
|
1030 // PR_POLL_WRITE/etc handling... with a lot of work. |
|
1031 |
|
1032 // Better yet, use the SCTP stack's notifications on buffer state to avoid |
|
1033 // filling the SCTP's buffers. |
|
1034 |
|
1035 // returns if we're still blocked or not |
|
1036 bool |
|
1037 DataChannelConnection::SendDeferredMessages() |
|
1038 { |
|
1039 uint32_t i; |
|
1040 nsRefPtr<DataChannel> channel; // we may null out the refs to this |
|
1041 bool still_blocked = false; |
|
1042 bool sent = false; |
|
1043 |
|
1044 // This may block while something is modifying channels, but should not block for IO |
|
1045 MutexAutoLock lock(mLock); |
|
1046 |
|
1047 // XXX For total fairness, on a still_blocked we'd start next time at the |
|
1048 // same index. Sorry, not going to bother for now. |
|
1049 for (i = 0; i < mStreams.Length(); ++i) { |
|
1050 channel = mStreams[i]; |
|
1051 if (!channel) |
|
1052 continue; |
|
1053 |
|
1054 // Only one of these should be set.... |
|
1055 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) { |
|
1056 if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol, |
|
1057 channel->mStream, |
|
1058 channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED, |
|
1059 channel->mPrPolicy, channel->mPrValue)) { |
|
1060 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ; |
|
1061 |
|
1062 channel->mState = OPEN; |
|
1063 channel->mReady = true; |
|
1064 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); |
|
1065 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1066 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, |
|
1067 channel)); |
|
1068 sent = true; |
|
1069 } else { |
|
1070 if (errno == EAGAIN || errno == EWOULDBLOCK) { |
|
1071 still_blocked = true; |
|
1072 } else { |
|
1073 // Close the channel, inform the user |
|
1074 mStreams[channel->mStream] = nullptr; |
|
1075 channel->mState = CLOSED; |
|
1076 // Don't need to reset; we didn't open it |
|
1077 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1078 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
|
1079 channel)); |
|
1080 } |
|
1081 } |
|
1082 } |
|
1083 if (still_blocked) |
|
1084 break; |
|
1085 |
|
1086 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) { |
|
1087 if (SendOpenAckMessage(channel->mStream)) { |
|
1088 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK; |
|
1089 sent = true; |
|
1090 } else { |
|
1091 if (errno == EAGAIN || errno == EWOULDBLOCK) { |
|
1092 still_blocked = true; |
|
1093 } else { |
|
1094 // Close the channel, inform the user |
|
1095 CloseInt(channel); |
|
1096 // XXX send error via DataChannelOnMessageAvailable (bug 843625) |
|
1097 } |
|
1098 } |
|
1099 } |
|
1100 if (still_blocked) |
|
1101 break; |
|
1102 |
|
1103 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) { |
|
1104 bool failed_send = false; |
|
1105 int32_t result; |
|
1106 |
|
1107 if (channel->mState == CLOSED || channel->mState == CLOSING) { |
|
1108 channel->mBufferedData.Clear(); |
|
1109 } |
|
1110 while (!channel->mBufferedData.IsEmpty() && |
|
1111 !failed_send) { |
|
1112 struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa; |
|
1113 const char *data = channel->mBufferedData[0]->mData; |
|
1114 uint32_t len = channel->mBufferedData[0]->mLength; |
|
1115 |
|
1116 // SCTP will return EMSGSIZE if the message is bigger than the buffer |
|
1117 // size (or EAGAIN if there isn't space) |
|
1118 if ((result = usrsctp_sendv(mSocket, data, len, |
|
1119 nullptr, 0, |
|
1120 (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa), |
|
1121 SCTP_SENDV_SPA, |
|
1122 0) < 0)) { |
|
1123 if (errno == EAGAIN || errno == EWOULDBLOCK) { |
|
1124 // leave queued for resend |
|
1125 failed_send = true; |
|
1126 LOG(("queue full again when resending %d bytes (%d)", len, result)); |
|
1127 } else { |
|
1128 LOG(("error %d re-sending string", errno)); |
|
1129 failed_send = true; |
|
1130 } |
|
1131 } else { |
|
1132 LOG(("Resent buffer of %d bytes (%d)", len, result)); |
|
1133 sent = true; |
|
1134 channel->mBufferedData.RemoveElementAt(0); |
|
1135 } |
|
1136 } |
|
1137 if (channel->mBufferedData.IsEmpty()) |
|
1138 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA; |
|
1139 else |
|
1140 still_blocked = true; |
|
1141 } |
|
1142 if (still_blocked) |
|
1143 break; |
|
1144 } |
|
1145 |
|
1146 if (!still_blocked) { |
|
1147 // mDeferTimeout becomes an estimate of how long we need to wait next time we block |
|
1148 return false; |
|
1149 } |
|
1150 // adjust time? More time for next wait if we didn't send anything, less if did |
|
1151 // Pretty crude, but better than nothing; just to keep CPU use down |
|
1152 if (!sent && mDeferTimeout < 50) |
|
1153 mDeferTimeout++; |
|
1154 else if (sent && mDeferTimeout > 10) |
|
1155 mDeferTimeout--; |
|
1156 |
|
1157 return true; |
|
1158 } |
|
1159 |
|
1160 void |
|
1161 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, |
|
1162 size_t length, |
|
1163 uint16_t stream) |
|
1164 { |
|
1165 nsRefPtr<DataChannel> channel; |
|
1166 uint32_t prValue; |
|
1167 uint16_t prPolicy; |
|
1168 uint32_t flags; |
|
1169 |
|
1170 mLock.AssertCurrentThreadOwns(); |
|
1171 |
|
1172 if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) { |
|
1173 LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length, |
|
1174 (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))); |
|
1175 if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) |
|
1176 return; |
|
1177 } |
|
1178 |
|
1179 LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req))); |
|
1180 |
|
1181 switch (req->channel_type) { |
|
1182 case DATA_CHANNEL_RELIABLE: |
|
1183 case DATA_CHANNEL_RELIABLE_UNORDERED: |
|
1184 prPolicy = SCTP_PR_SCTP_NONE; |
|
1185 break; |
|
1186 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: |
|
1187 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: |
|
1188 prPolicy = SCTP_PR_SCTP_RTX; |
|
1189 break; |
|
1190 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: |
|
1191 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: |
|
1192 prPolicy = SCTP_PR_SCTP_TTL; |
|
1193 break; |
|
1194 default: |
|
1195 LOG(("Unknown channel type", req->channel_type)); |
|
1196 /* XXX error handling */ |
|
1197 return; |
|
1198 } |
|
1199 prValue = ntohl(req->reliability_param); |
|
1200 flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; |
|
1201 |
|
1202 if ((channel = FindChannelByStream(stream))) { |
|
1203 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { |
|
1204 LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.", |
|
1205 stream, channel->mState)); |
|
1206 /* XXX: some error handling */ |
|
1207 } else { |
|
1208 LOG(("Open for externally negotiated channel %u", stream)); |
|
1209 // XXX should also check protocol, maybe label |
|
1210 if (prPolicy != channel->mPrPolicy || |
|
1211 prValue != channel->mPrValue || |
|
1212 flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) |
|
1213 { |
|
1214 LOG(("WARNING: external negotiation mismatch with OpenRequest:" |
|
1215 "channel %u, policy %u/%u, value %u/%u, flags %x/%x", |
|
1216 stream, prPolicy, channel->mPrPolicy, |
|
1217 prValue, channel->mPrValue, flags, channel->mFlags)); |
|
1218 } |
|
1219 } |
|
1220 return; |
|
1221 } |
|
1222 if (stream >= mStreams.Length()) { |
|
1223 LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length())); |
|
1224 return; |
|
1225 } |
|
1226 |
|
1227 nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length))); |
|
1228 nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)], |
|
1229 ntohs(req->protocol_length))); |
|
1230 |
|
1231 channel = new DataChannel(this, |
|
1232 stream, |
|
1233 DataChannel::CONNECTING, |
|
1234 label, |
|
1235 protocol, |
|
1236 prPolicy, prValue, |
|
1237 flags, |
|
1238 nullptr, nullptr); |
|
1239 mStreams[stream] = channel; |
|
1240 |
|
1241 channel->mState = DataChannel::WAITING_TO_OPEN; |
|
1242 |
|
1243 LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__, |
|
1244 channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState)); |
|
1245 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1246 DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, |
|
1247 this, channel)); |
|
1248 |
|
1249 LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); |
|
1250 |
|
1251 if (!SendOpenAckMessage(stream)) { |
|
1252 // XXX Only on EAGAIN!? And if not, then close the channel?? |
|
1253 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK; |
|
1254 StartDefer(); |
|
1255 } |
|
1256 |
|
1257 // Now process any queued data messages for the channel (which will |
|
1258 // themselves likely get queued until we leave WAITING_TO_OPEN, plus any |
|
1259 // more that come in before that happens) |
|
1260 DeliverQueuedData(stream); |
|
1261 } |
|
1262 |
|
1263 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. |
|
1264 // That would make this code moot. Keep it for now for backwards compatibility. |
|
1265 void |
|
1266 DataChannelConnection::DeliverQueuedData(uint16_t stream) |
|
1267 { |
|
1268 mLock.AssertCurrentThreadOwns(); |
|
1269 |
|
1270 uint32_t i = 0; |
|
1271 while (i < mQueuedData.Length()) { |
|
1272 // Careful! we may modify the array length from within the loop! |
|
1273 if (mQueuedData[i]->mStream == stream) { |
|
1274 LOG(("Delivering queued data for stream %u, length %u", |
|
1275 stream, mQueuedData[i]->mLength)); |
|
1276 // Deliver the queued data |
|
1277 HandleDataMessage(mQueuedData[i]->mPpid, |
|
1278 mQueuedData[i]->mData, mQueuedData[i]->mLength, |
|
1279 mQueuedData[i]->mStream); |
|
1280 mQueuedData.RemoveElementAt(i); |
|
1281 continue; // don't bump index since we removed the element |
|
1282 } |
|
1283 i++; |
|
1284 } |
|
1285 } |
|
1286 |
|
1287 void |
|
1288 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, |
|
1289 size_t length, uint16_t stream) |
|
1290 { |
|
1291 DataChannel *channel; |
|
1292 |
|
1293 mLock.AssertCurrentThreadOwns(); |
|
1294 |
|
1295 channel = FindChannelByStream(stream); |
|
1296 NS_ENSURE_TRUE_VOID(channel); |
|
1297 |
|
1298 LOG(("OpenAck received for stream %u, waiting=%d", stream, |
|
1299 (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0)); |
|
1300 |
|
1301 channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK; |
|
1302 } |
|
1303 |
|
1304 void |
|
1305 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream) |
|
1306 { |
|
1307 /* XXX: Send an error message? */ |
|
1308 LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream)); |
|
1309 // XXX Log to JS error console if possible |
|
1310 } |
|
1311 |
|
1312 void |
|
1313 DataChannelConnection::HandleDataMessage(uint32_t ppid, |
|
1314 const void *data, size_t length, |
|
1315 uint16_t stream) |
|
1316 { |
|
1317 DataChannel *channel; |
|
1318 const char *buffer = (const char *) data; |
|
1319 |
|
1320 mLock.AssertCurrentThreadOwns(); |
|
1321 |
|
1322 channel = FindChannelByStream(stream); |
|
1323 |
|
1324 // XXX A closed channel may trip this... check |
|
1325 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. |
|
1326 // That would make this code moot. Keep it for now for backwards compatibility. |
|
1327 if (!channel) { |
|
1328 // In the updated 0-RTT open case, the sender can send data immediately |
|
1329 // after Open, and doesn't set the in-order bit (since we don't have a |
|
1330 // response or ack). Also, with external negotiation, data can come in |
|
1331 // before we're told about the external negotiation. We need to buffer |
|
1332 // data until either a) Open comes in, if the ordering get messed up, |
|
1333 // or b) the app tells us this channel was externally negotiated. When |
|
1334 // these occur, we deliver the data. |
|
1335 |
|
1336 // Since this is rare and non-performance, keep a single list of queued |
|
1337 // data messages to deliver once the channel opens. |
|
1338 LOG(("Queuing data for stream %u, length %u", stream, length)); |
|
1339 // Copies data |
|
1340 mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length)); |
|
1341 return; |
|
1342 } |
|
1343 |
|
1344 // XXX should this be a simple if, no warnings/debugbreaks? |
|
1345 NS_ENSURE_TRUE_VOID(channel->mState != CLOSED); |
|
1346 |
|
1347 { |
|
1348 nsAutoCString recvData(buffer, length); // copies (<64) or allocates |
|
1349 bool is_binary = true; |
|
1350 |
|
1351 if (ppid == DATA_CHANNEL_PPID_DOMSTRING || |
|
1352 ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) { |
|
1353 is_binary = false; |
|
1354 } |
|
1355 if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) { |
|
1356 NS_WARNING("DataChannel message aborted by fragment type change!"); |
|
1357 channel->mRecvBuffer.Truncate(0); |
|
1358 } |
|
1359 channel->mIsRecvBinary = is_binary; |
|
1360 |
|
1361 switch (ppid) { |
|
1362 case DATA_CHANNEL_PPID_DOMSTRING: |
|
1363 case DATA_CHANNEL_PPID_BINARY: |
|
1364 channel->mRecvBuffer += recvData; |
|
1365 LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u", |
|
1366 is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(), |
|
1367 channel->mStream)); |
|
1368 return; // Not ready to notify application |
|
1369 |
|
1370 case DATA_CHANNEL_PPID_DOMSTRING_LAST: |
|
1371 LOG(("DataChannel: String message received of length %lu on channel %u", |
|
1372 length, channel->mStream)); |
|
1373 if (!channel->mRecvBuffer.IsEmpty()) { |
|
1374 channel->mRecvBuffer += recvData; |
|
1375 LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel)); |
|
1376 channel->SendOrQueue(new DataChannelOnMessageAvailable( |
|
1377 DataChannelOnMessageAvailable::ON_DATA, this, |
|
1378 channel, channel->mRecvBuffer, -1)); |
|
1379 channel->mRecvBuffer.Truncate(0); |
|
1380 return; |
|
1381 } |
|
1382 // else send using recvData normally |
|
1383 length = -1; // Flag for DOMString |
|
1384 |
|
1385 // WebSockets checks IsUTF8() here; we can try to deliver it |
|
1386 break; |
|
1387 |
|
1388 case DATA_CHANNEL_PPID_BINARY_LAST: |
|
1389 LOG(("DataChannel: Received binary message of length %lu on channel id %u", |
|
1390 length, channel->mStream)); |
|
1391 if (!channel->mRecvBuffer.IsEmpty()) { |
|
1392 channel->mRecvBuffer += recvData; |
|
1393 LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); |
|
1394 channel->SendOrQueue(new DataChannelOnMessageAvailable( |
|
1395 DataChannelOnMessageAvailable::ON_DATA, this, |
|
1396 channel, channel->mRecvBuffer, |
|
1397 channel->mRecvBuffer.Length())); |
|
1398 channel->mRecvBuffer.Truncate(0); |
|
1399 return; |
|
1400 } |
|
1401 // else send using recvData normally |
|
1402 break; |
|
1403 |
|
1404 default: |
|
1405 NS_ERROR("Unknown data PPID"); |
|
1406 return; |
|
1407 } |
|
1408 /* Notify onmessage */ |
|
1409 LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); |
|
1410 channel->SendOrQueue(new DataChannelOnMessageAvailable( |
|
1411 DataChannelOnMessageAvailable::ON_DATA, this, |
|
1412 channel, recvData, length)); |
|
1413 } |
|
1414 } |
|
1415 |
|
1416 // Called with mLock locked! |
|
1417 void |
|
1418 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream) |
|
1419 { |
|
1420 const struct rtcweb_datachannel_open_request *req; |
|
1421 const struct rtcweb_datachannel_ack *ack; |
|
1422 |
|
1423 mLock.AssertCurrentThreadOwns(); |
|
1424 |
|
1425 switch (ppid) { |
|
1426 case DATA_CHANNEL_PPID_CONTROL: |
|
1427 req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer); |
|
1428 |
|
1429 NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message |
|
1430 switch (req->msg_type) { |
|
1431 case DATA_CHANNEL_OPEN_REQUEST: |
|
1432 // structure includes a possibly-unused char label[1] (in a packed structure) |
|
1433 NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1); |
|
1434 |
|
1435 HandleOpenRequestMessage(req, length, stream); |
|
1436 break; |
|
1437 case DATA_CHANNEL_ACK: |
|
1438 // >= sizeof(*ack) checked above |
|
1439 |
|
1440 ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer); |
|
1441 HandleOpenAckMessage(ack, length, stream); |
|
1442 break; |
|
1443 default: |
|
1444 HandleUnknownMessage(ppid, length, stream); |
|
1445 break; |
|
1446 } |
|
1447 break; |
|
1448 case DATA_CHANNEL_PPID_DOMSTRING: |
|
1449 case DATA_CHANNEL_PPID_DOMSTRING_LAST: |
|
1450 case DATA_CHANNEL_PPID_BINARY: |
|
1451 case DATA_CHANNEL_PPID_BINARY_LAST: |
|
1452 HandleDataMessage(ppid, buffer, length, stream); |
|
1453 break; |
|
1454 default: |
|
1455 LOG(("Message of length %lu, PPID %u on stream %u received.", |
|
1456 length, ppid, stream)); |
|
1457 break; |
|
1458 } |
|
1459 } |
|
1460 |
|
1461 void |
|
1462 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac) |
|
1463 { |
|
1464 uint32_t i, n; |
|
1465 |
|
1466 switch (sac->sac_state) { |
|
1467 case SCTP_COMM_UP: |
|
1468 LOG(("Association change: SCTP_COMM_UP")); |
|
1469 if (mState == CONNECTING) { |
|
1470 mSocket = mMasterSocket; |
|
1471 mState = OPEN; |
|
1472 |
|
1473 SetEvenOdd(); |
|
1474 |
|
1475 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1476 DataChannelOnMessageAvailable::ON_CONNECTION, |
|
1477 this, true)); |
|
1478 LOG(("DTLS connect() succeeded! Entering connected mode")); |
|
1479 |
|
1480 // Open any streams pending... |
|
1481 ProcessQueuedOpens(); |
|
1482 |
|
1483 } else if (mState == OPEN) { |
|
1484 LOG(("DataConnection Already OPEN")); |
|
1485 } else { |
|
1486 LOG(("Unexpected state: %d", mState)); |
|
1487 } |
|
1488 break; |
|
1489 case SCTP_COMM_LOST: |
|
1490 LOG(("Association change: SCTP_COMM_LOST")); |
|
1491 // This association is toast, so also close all the channels -- from mainthread! |
|
1492 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1493 DataChannelOnMessageAvailable::ON_DISCONNECTED, |
|
1494 this)); |
|
1495 break; |
|
1496 case SCTP_RESTART: |
|
1497 LOG(("Association change: SCTP_RESTART")); |
|
1498 break; |
|
1499 case SCTP_SHUTDOWN_COMP: |
|
1500 LOG(("Association change: SCTP_SHUTDOWN_COMP")); |
|
1501 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1502 DataChannelOnMessageAvailable::ON_DISCONNECTED, |
|
1503 this)); |
|
1504 break; |
|
1505 case SCTP_CANT_STR_ASSOC: |
|
1506 LOG(("Association change: SCTP_CANT_STR_ASSOC")); |
|
1507 break; |
|
1508 default: |
|
1509 LOG(("Association change: UNKNOWN")); |
|
1510 break; |
|
1511 } |
|
1512 LOG(("Association change: streams (in/out) = (%u/%u)", |
|
1513 sac->sac_inbound_streams, sac->sac_outbound_streams)); |
|
1514 |
|
1515 NS_ENSURE_TRUE_VOID(sac); |
|
1516 n = sac->sac_length - sizeof(*sac); |
|
1517 if (((sac->sac_state == SCTP_COMM_UP) || |
|
1518 (sac->sac_state == SCTP_RESTART)) && (n > 0)) { |
|
1519 for (i = 0; i < n; ++i) { |
|
1520 switch (sac->sac_info[i]) { |
|
1521 case SCTP_ASSOC_SUPPORTS_PR: |
|
1522 LOG(("Supports: PR")); |
|
1523 break; |
|
1524 case SCTP_ASSOC_SUPPORTS_AUTH: |
|
1525 LOG(("Supports: AUTH")); |
|
1526 break; |
|
1527 case SCTP_ASSOC_SUPPORTS_ASCONF: |
|
1528 LOG(("Supports: ASCONF")); |
|
1529 break; |
|
1530 case SCTP_ASSOC_SUPPORTS_MULTIBUF: |
|
1531 LOG(("Supports: MULTIBUF")); |
|
1532 break; |
|
1533 case SCTP_ASSOC_SUPPORTS_RE_CONFIG: |
|
1534 LOG(("Supports: RE-CONFIG")); |
|
1535 break; |
|
1536 default: |
|
1537 LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); |
|
1538 break; |
|
1539 } |
|
1540 } |
|
1541 } else if (((sac->sac_state == SCTP_COMM_LOST) || |
|
1542 (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { |
|
1543 LOG(("Association: ABORT =")); |
|
1544 for (i = 0; i < n; ++i) { |
|
1545 LOG((" 0x%02x", sac->sac_info[i])); |
|
1546 } |
|
1547 } |
|
1548 if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || |
|
1549 (sac->sac_state == SCTP_SHUTDOWN_COMP) || |
|
1550 (sac->sac_state == SCTP_COMM_LOST)) { |
|
1551 return; |
|
1552 } |
|
1553 } |
|
1554 |
|
1555 void |
|
1556 DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc) |
|
1557 { |
|
1558 char addr_buf[INET6_ADDRSTRLEN]; |
|
1559 const char *addr = ""; |
|
1560 struct sockaddr_in *sin; |
|
1561 struct sockaddr_in6 *sin6; |
|
1562 #if defined(__Userspace_os_Windows) |
|
1563 DWORD addr_len = INET6_ADDRSTRLEN; |
|
1564 #endif |
|
1565 |
|
1566 switch (spc->spc_aaddr.ss_family) { |
|
1567 case AF_INET: |
|
1568 sin = (struct sockaddr_in *)&spc->spc_aaddr; |
|
1569 #if !defined(__Userspace_os_Windows) |
|
1570 addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); |
|
1571 #else |
|
1572 if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr, |
|
1573 addr_buf, &addr_len)) { |
|
1574 return; |
|
1575 } |
|
1576 #endif |
|
1577 break; |
|
1578 case AF_INET6: |
|
1579 sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr; |
|
1580 #if !defined(__Userspace_os_Windows) |
|
1581 addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); |
|
1582 #else |
|
1583 if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr, |
|
1584 addr_buf, &addr_len)) { |
|
1585 return; |
|
1586 } |
|
1587 #endif |
|
1588 case AF_CONN: |
|
1589 addr = "DTLS connection"; |
|
1590 break; |
|
1591 default: |
|
1592 break; |
|
1593 } |
|
1594 LOG(("Peer address %s is now ", addr)); |
|
1595 switch (spc->spc_state) { |
|
1596 case SCTP_ADDR_AVAILABLE: |
|
1597 LOG(("SCTP_ADDR_AVAILABLE")); |
|
1598 break; |
|
1599 case SCTP_ADDR_UNREACHABLE: |
|
1600 LOG(("SCTP_ADDR_UNREACHABLE")); |
|
1601 break; |
|
1602 case SCTP_ADDR_REMOVED: |
|
1603 LOG(("SCTP_ADDR_REMOVED")); |
|
1604 break; |
|
1605 case SCTP_ADDR_ADDED: |
|
1606 LOG(("SCTP_ADDR_ADDED")); |
|
1607 break; |
|
1608 case SCTP_ADDR_MADE_PRIM: |
|
1609 LOG(("SCTP_ADDR_MADE_PRIM")); |
|
1610 break; |
|
1611 case SCTP_ADDR_CONFIRMED: |
|
1612 LOG(("SCTP_ADDR_CONFIRMED")); |
|
1613 break; |
|
1614 default: |
|
1615 LOG(("UNKNOWN")); |
|
1616 break; |
|
1617 } |
|
1618 LOG((" (error = 0x%08x).\n", spc->spc_error)); |
|
1619 } |
|
1620 |
|
1621 void |
|
1622 DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre) |
|
1623 { |
|
1624 size_t i, n; |
|
1625 |
|
1626 n = sre->sre_length - sizeof(struct sctp_remote_error); |
|
1627 LOG(("Remote Error (error = 0x%04x): ", sre->sre_error)); |
|
1628 for (i = 0; i < n; ++i) { |
|
1629 LOG((" 0x%02x", sre-> sre_data[i])); |
|
1630 } |
|
1631 } |
|
1632 |
|
1633 void |
|
1634 DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse) |
|
1635 { |
|
1636 LOG(("Shutdown event.")); |
|
1637 /* XXX: notify all channels. */ |
|
1638 // Attempts to actually send anything will fail |
|
1639 } |
|
1640 |
|
1641 void |
|
1642 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai) |
|
1643 { |
|
1644 LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind)); |
|
1645 } |
|
1646 |
|
1647 void |
|
1648 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe) |
|
1649 { |
|
1650 size_t i, n; |
|
1651 |
|
1652 if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { |
|
1653 LOG(("Unsent ")); |
|
1654 } |
|
1655 if (ssfe->ssfe_flags & SCTP_DATA_SENT) { |
|
1656 LOG(("Sent ")); |
|
1657 } |
|
1658 if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { |
|
1659 LOG(("(flags = %x) ", ssfe->ssfe_flags)); |
|
1660 } |
|
1661 LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x", |
|
1662 ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, |
|
1663 ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); |
|
1664 n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); |
|
1665 for (i = 0; i < n; ++i) { |
|
1666 LOG((" 0x%02x", ssfe->ssfe_data[i])); |
|
1667 } |
|
1668 } |
|
1669 |
|
1670 void |
|
1671 DataChannelConnection::ClearResets() |
|
1672 { |
|
1673 // Clear all pending resets |
|
1674 if (!mStreamsResetting.IsEmpty()) { |
|
1675 LOG(("Clearing resets for %d streams", mStreamsResetting.Length())); |
|
1676 } |
|
1677 |
|
1678 for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) { |
|
1679 nsRefPtr<DataChannel> channel; |
|
1680 channel = FindChannelByStream(mStreamsResetting[i]); |
|
1681 if (channel) { |
|
1682 LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get())); |
|
1683 mStreams[channel->mStream] = nullptr; |
|
1684 } |
|
1685 } |
|
1686 mStreamsResetting.Clear(); |
|
1687 } |
|
1688 |
|
1689 void |
|
1690 DataChannelConnection::ResetOutgoingStream(uint16_t stream) |
|
1691 { |
|
1692 uint32_t i; |
|
1693 |
|
1694 mLock.AssertCurrentThreadOwns(); |
|
1695 LOG(("Connection %p: Resetting outgoing stream %u", |
|
1696 (void *) this, stream)); |
|
1697 // Rarely has more than a couple items and only for a short time |
|
1698 for (i = 0; i < mStreamsResetting.Length(); ++i) { |
|
1699 if (mStreamsResetting[i] == stream) { |
|
1700 return; |
|
1701 } |
|
1702 } |
|
1703 mStreamsResetting.AppendElement(stream); |
|
1704 } |
|
1705 |
|
1706 void |
|
1707 DataChannelConnection::SendOutgoingStreamReset() |
|
1708 { |
|
1709 struct sctp_reset_streams *srs; |
|
1710 uint32_t i; |
|
1711 size_t len; |
|
1712 |
|
1713 LOG(("Connection %p: Sending outgoing stream reset for %d streams", |
|
1714 (void *) this, mStreamsResetting.Length())); |
|
1715 mLock.AssertCurrentThreadOwns(); |
|
1716 if (mStreamsResetting.IsEmpty()) { |
|
1717 LOG(("No streams to reset")); |
|
1718 return; |
|
1719 } |
|
1720 len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); |
|
1721 srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc |
|
1722 memset(srs, 0, len); |
|
1723 srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; |
|
1724 srs->srs_number_streams = mStreamsResetting.Length(); |
|
1725 for (i = 0; i < mStreamsResetting.Length(); ++i) { |
|
1726 srs->srs_stream_list[i] = mStreamsResetting[i]; |
|
1727 } |
|
1728 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) { |
|
1729 LOG(("***failed: setsockopt RESET, errno %d", errno)); |
|
1730 // if errno == EALREADY, this is normal - we can't send another reset |
|
1731 // with one pending. |
|
1732 // When we get an incoming reset (which may be a response to our |
|
1733 // outstanding one), see if we have any pending outgoing resets and |
|
1734 // send them |
|
1735 } else { |
|
1736 mStreamsResetting.Clear(); |
|
1737 } |
|
1738 moz_free(srs); |
|
1739 } |
|
1740 |
|
1741 void |
|
1742 DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) |
|
1743 { |
|
1744 uint32_t n, i; |
|
1745 nsRefPtr<DataChannel> channel; // since we may null out the ref to the channel |
|
1746 |
|
1747 if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && |
|
1748 !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { |
|
1749 n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); |
|
1750 for (i = 0; i < n; ++i) { |
|
1751 if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { |
|
1752 channel = FindChannelByStream(strrst->strreset_stream_list[i]); |
|
1753 if (channel) { |
|
1754 // The other side closed the channel |
|
1755 // We could be in three states: |
|
1756 // 1. Normal state (input and output streams (OPEN) |
|
1757 // Notify application, send a RESET in response on our |
|
1758 // outbound channel. Go to CLOSED |
|
1759 // 2. We sent our own reset (CLOSING); either they crossed on the |
|
1760 // wire, or this is a response to our Reset. |
|
1761 // Go to CLOSED |
|
1762 // 3. We've sent a open but haven't gotten a response yet (CONNECTING) |
|
1763 // I believe this is impossible, as we don't have an input stream yet. |
|
1764 |
|
1765 LOG(("Incoming: Channel %u closed, state %d", |
|
1766 channel->mStream, channel->mState)); |
|
1767 ASSERT_WEBRTC(channel->mState == DataChannel::OPEN || |
|
1768 channel->mState == DataChannel::CLOSING || |
|
1769 channel->mState == DataChannel::CONNECTING || |
|
1770 channel->mState == DataChannel::WAITING_TO_OPEN); |
|
1771 if (channel->mState == DataChannel::OPEN || |
|
1772 channel->mState == DataChannel::WAITING_TO_OPEN) { |
|
1773 ResetOutgoingStream(channel->mStream); |
|
1774 SendOutgoingStreamReset(); |
|
1775 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1776 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
|
1777 channel)); |
|
1778 } |
|
1779 mStreams[channel->mStream] = nullptr; |
|
1780 |
|
1781 LOG(("Disconnected DataChannel %p from connection %p", |
|
1782 (void *) channel.get(), (void *) channel->mConnection.get())); |
|
1783 channel->Destroy(); |
|
1784 // At this point when we leave here, the object is a zombie held alive only by the DOM object |
|
1785 } else { |
|
1786 LOG(("Can't find incoming channel %d",i)); |
|
1787 } |
|
1788 } |
|
1789 } |
|
1790 } |
|
1791 |
|
1792 // In case we failed to send a RESET due to having one outstanding, process any pending resets now: |
|
1793 if (!mStreamsResetting.IsEmpty()) { |
|
1794 LOG(("Sending %d pending resets", mStreamsResetting.Length())); |
|
1795 SendOutgoingStreamReset(); |
|
1796 } |
|
1797 } |
|
1798 |
|
1799 void |
|
1800 DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg) |
|
1801 { |
|
1802 uint16_t stream; |
|
1803 uint32_t i; |
|
1804 nsRefPtr<DataChannel> channel; |
|
1805 |
|
1806 if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { |
|
1807 LOG(("*** Failed increasing number of streams from %u (%u/%u)", |
|
1808 mStreams.Length(), |
|
1809 strchg->strchange_instrms, |
|
1810 strchg->strchange_outstrms)); |
|
1811 // XXX FIX! notify pending opens of failure |
|
1812 return; |
|
1813 } else { |
|
1814 if (strchg->strchange_instrms > mStreams.Length()) { |
|
1815 LOG(("Other side increased streams from %u to %u", |
|
1816 mStreams.Length(), strchg->strchange_instrms)); |
|
1817 } |
|
1818 if (strchg->strchange_outstrms > mStreams.Length() || |
|
1819 strchg->strchange_instrms > mStreams.Length()) { |
|
1820 uint16_t old_len = mStreams.Length(); |
|
1821 uint16_t new_len = std::max(strchg->strchange_outstrms, |
|
1822 strchg->strchange_instrms); |
|
1823 LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)", |
|
1824 old_len, new_len, new_len - old_len, |
|
1825 strchg->strchange_instrms)); |
|
1826 // make sure both are the same length |
|
1827 mStreams.AppendElements(new_len - old_len); |
|
1828 LOG(("New length = %d (was %d)", mStreams.Length(), old_len)); |
|
1829 for (size_t i = old_len; i < mStreams.Length(); ++i) { |
|
1830 mStreams[i] = nullptr; |
|
1831 } |
|
1832 // Re-process any channels waiting for streams. |
|
1833 // Linear search, but we don't increase channels often and |
|
1834 // the array would only get long in case of an app error normally |
|
1835 |
|
1836 // Make sure we request enough streams if there's a big jump in streams |
|
1837 // Could make a more complex API for OpenXxxFinish() and avoid this loop |
|
1838 int32_t num_needed = mPending.GetSize(); |
|
1839 LOG(("%d of %d new streams already needed", num_needed, |
|
1840 new_len - old_len)); |
|
1841 num_needed -= (new_len - old_len); // number we added |
|
1842 if (num_needed > 0) { |
|
1843 if (num_needed < 16) |
|
1844 num_needed = 16; |
|
1845 LOG(("Not enough new streams, asking for %d more", num_needed)); |
|
1846 RequestMoreStreams(num_needed); |
|
1847 } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { |
|
1848 LOG(("Requesting %d output streams to match partner", |
|
1849 strchg->strchange_instrms - strchg->strchange_outstrms)); |
|
1850 RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms); |
|
1851 } |
|
1852 |
|
1853 ProcessQueuedOpens(); |
|
1854 } |
|
1855 // else probably not a change in # of streams |
|
1856 } |
|
1857 |
|
1858 for (i = 0; i < mStreams.Length(); ++i) { |
|
1859 channel = mStreams[i]; |
|
1860 if (!channel) |
|
1861 continue; |
|
1862 |
|
1863 if ((channel->mState == CONNECTING) && |
|
1864 (channel->mStream == INVALID_STREAM)) { |
|
1865 if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || |
|
1866 (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { |
|
1867 /* XXX: Signal to the other end. */ |
|
1868 channel->mState = CLOSED; |
|
1869 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
1870 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
|
1871 channel)); |
|
1872 // maybe fire onError (bug 843625) |
|
1873 } else { |
|
1874 stream = FindFreeStream(); |
|
1875 if (stream != INVALID_STREAM) { |
|
1876 channel->mStream = stream; |
|
1877 mStreams[stream] = channel; |
|
1878 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; |
|
1879 /// XXX fix |
|
1880 StartDefer(); |
|
1881 } else { |
|
1882 /* We will not find more ... */ |
|
1883 break; |
|
1884 } |
|
1885 } |
|
1886 } |
|
1887 } |
|
1888 } |
|
1889 |
|
1890 |
|
1891 // Called with mLock locked! |
|
1892 void |
|
1893 DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n) |
|
1894 { |
|
1895 mLock.AssertCurrentThreadOwns(); |
|
1896 if (notif->sn_header.sn_length != (uint32_t)n) { |
|
1897 return; |
|
1898 } |
|
1899 switch (notif->sn_header.sn_type) { |
|
1900 case SCTP_ASSOC_CHANGE: |
|
1901 HandleAssociationChangeEvent(&(notif->sn_assoc_change)); |
|
1902 break; |
|
1903 case SCTP_PEER_ADDR_CHANGE: |
|
1904 HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); |
|
1905 break; |
|
1906 case SCTP_REMOTE_ERROR: |
|
1907 HandleRemoteErrorEvent(&(notif->sn_remote_error)); |
|
1908 break; |
|
1909 case SCTP_SHUTDOWN_EVENT: |
|
1910 HandleShutdownEvent(&(notif->sn_shutdown_event)); |
|
1911 break; |
|
1912 case SCTP_ADAPTATION_INDICATION: |
|
1913 HandleAdaptationIndication(&(notif->sn_adaptation_event)); |
|
1914 break; |
|
1915 case SCTP_PARTIAL_DELIVERY_EVENT: |
|
1916 LOG(("SCTP_PARTIAL_DELIVERY_EVENT")); |
|
1917 break; |
|
1918 case SCTP_AUTHENTICATION_EVENT: |
|
1919 LOG(("SCTP_AUTHENTICATION_EVENT")); |
|
1920 break; |
|
1921 case SCTP_SENDER_DRY_EVENT: |
|
1922 //LOG(("SCTP_SENDER_DRY_EVENT")); |
|
1923 break; |
|
1924 case SCTP_NOTIFICATIONS_STOPPED_EVENT: |
|
1925 LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); |
|
1926 break; |
|
1927 case SCTP_SEND_FAILED_EVENT: |
|
1928 HandleSendFailedEvent(&(notif->sn_send_failed_event)); |
|
1929 break; |
|
1930 case SCTP_STREAM_RESET_EVENT: |
|
1931 HandleStreamResetEvent(&(notif->sn_strreset_event)); |
|
1932 break; |
|
1933 case SCTP_ASSOC_RESET_EVENT: |
|
1934 LOG(("SCTP_ASSOC_RESET_EVENT")); |
|
1935 break; |
|
1936 case SCTP_STREAM_CHANGE_EVENT: |
|
1937 HandleStreamChangeEvent(&(notif->sn_strchange_event)); |
|
1938 break; |
|
1939 default: |
|
1940 LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); |
|
1941 break; |
|
1942 } |
|
1943 } |
|
1944 |
|
1945 int |
|
1946 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen, |
|
1947 struct sctp_rcvinfo rcv, int32_t flags) |
|
1948 { |
|
1949 ASSERT_WEBRTC(!NS_IsMainThread()); |
|
1950 |
|
1951 if (!data) { |
|
1952 usrsctp_close(sock); // SCTP has finished shutting down |
|
1953 } else { |
|
1954 MutexAutoLock lock(mLock); |
|
1955 if (flags & MSG_NOTIFICATION) { |
|
1956 HandleNotification(static_cast<union sctp_notification *>(data), datalen); |
|
1957 } else { |
|
1958 HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid); |
|
1959 } |
|
1960 } |
|
1961 // sctp allocates 'data' with malloc(), and expects the receiver to free |
|
1962 // it (presumably with free). |
|
1963 // XXX future optimization: try to deliver messages without an internal |
|
1964 // alloc/copy, and if so delay the free until later. |
|
1965 free(data); |
|
1966 // usrsctp defines the callback as returning an int, but doesn't use it |
|
1967 return 1; |
|
1968 } |
|
1969 |
|
1970 already_AddRefed<DataChannel> |
|
1971 DataChannelConnection::Open(const nsACString& label, const nsACString& protocol, |
|
1972 Type type, bool inOrder, |
|
1973 uint32_t prValue, DataChannelListener *aListener, |
|
1974 nsISupports *aContext, bool aExternalNegotiated, |
|
1975 uint16_t aStream) |
|
1976 { |
|
1977 // aStream == INVALID_STREAM to have the protocol allocate |
|
1978 uint16_t prPolicy = SCTP_PR_SCTP_NONE; |
|
1979 uint32_t flags; |
|
1980 |
|
1981 LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u", |
|
1982 PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(), |
|
1983 type, inOrder, prValue, aListener, aContext, |
|
1984 aExternalNegotiated ? "true" : "false", aStream)); |
|
1985 switch (type) { |
|
1986 case DATA_CHANNEL_RELIABLE: |
|
1987 prPolicy = SCTP_PR_SCTP_NONE; |
|
1988 break; |
|
1989 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: |
|
1990 prPolicy = SCTP_PR_SCTP_RTX; |
|
1991 break; |
|
1992 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: |
|
1993 prPolicy = SCTP_PR_SCTP_TTL; |
|
1994 break; |
|
1995 } |
|
1996 if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) { |
|
1997 return nullptr; |
|
1998 } |
|
1999 |
|
2000 // Don't look past currently-negotiated streams |
|
2001 if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) { |
|
2002 LOG(("ERROR: external negotiation of already-open channel %u", aStream)); |
|
2003 // XXX How do we indicate this up to the application? Probably the |
|
2004 // caller's job, but we may need to return an error code. |
|
2005 return nullptr; |
|
2006 } |
|
2007 |
|
2008 flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; |
|
2009 nsRefPtr<DataChannel> channel(new DataChannel(this, |
|
2010 aStream, |
|
2011 DataChannel::CONNECTING, |
|
2012 label, protocol, |
|
2013 type, prValue, |
|
2014 flags, |
|
2015 aListener, aContext)); |
|
2016 if (aExternalNegotiated) { |
|
2017 channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED; |
|
2018 } |
|
2019 |
|
2020 MutexAutoLock lock(mLock); // OpenFinish assumes this |
|
2021 return OpenFinish(channel.forget()); |
|
2022 } |
|
2023 |
|
2024 // Separate routine so we can also call it to finish up from pending opens |
|
2025 already_AddRefed<DataChannel> |
|
2026 DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel) |
|
2027 { |
|
2028 nsRefPtr<DataChannel> channel(aChannel); // takes the reference passed in |
|
2029 // Normally 1 reference if called from ::Open(), or 2 if called from |
|
2030 // ProcessQueuedOpens() unless the DOMDataChannel was gc'd |
|
2031 uint16_t stream = channel->mStream; |
|
2032 bool queue = false; |
|
2033 |
|
2034 mLock.AssertCurrentThreadOwns(); |
|
2035 |
|
2036 // Cases we care about: |
|
2037 // Pre-negotiated: |
|
2038 // Not Open: |
|
2039 // Doesn't fit: |
|
2040 // -> change initial ask or renegotiate after open |
|
2041 // -> queue open |
|
2042 // Open: |
|
2043 // Doesn't fit: |
|
2044 // -> RequestMoreStreams && queue |
|
2045 // Does fit: |
|
2046 // -> open |
|
2047 // Not negotiated: |
|
2048 // Not Open: |
|
2049 // -> queue open |
|
2050 // Open: |
|
2051 // -> Try to get a stream |
|
2052 // Doesn't fit: |
|
2053 // -> RequestMoreStreams && queue |
|
2054 // Does fit: |
|
2055 // -> open |
|
2056 // So the Open cases are basically the same |
|
2057 // Not Open cases are simply queue for non-negotiated, and |
|
2058 // either change the initial ask or possibly renegotiate after open. |
|
2059 |
|
2060 if (mState == OPEN) { |
|
2061 if (stream == INVALID_STREAM) { |
|
2062 stream = FindFreeStream(); // may be INVALID_STREAM if we need more |
|
2063 } |
|
2064 if (stream == INVALID_STREAM || stream >= mStreams.Length()) { |
|
2065 // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams |
|
2066 // to avoid going back immediately for more if the ask to N, N+1, etc |
|
2067 int32_t more_needed = (stream == INVALID_STREAM) ? 16 : |
|
2068 (stream-((int32_t)mStreams.Length())) + 16; |
|
2069 if (!RequestMoreStreams(more_needed)) { |
|
2070 // Something bad happened... we're done |
|
2071 goto request_error_cleanup; |
|
2072 } |
|
2073 queue = true; |
|
2074 } |
|
2075 } else { |
|
2076 // not OPEN |
|
2077 if (stream != INVALID_STREAM && stream >= mStreams.Length() && |
|
2078 mState == CLOSED) { |
|
2079 // Update number of streams for init message |
|
2080 struct sctp_initmsg initmsg; |
|
2081 socklen_t len = sizeof(initmsg); |
|
2082 int32_t total_needed = stream+16; |
|
2083 |
|
2084 memset(&initmsg, 0, sizeof(initmsg)); |
|
2085 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { |
|
2086 LOG(("*** failed getsockopt SCTP_INITMSG")); |
|
2087 goto request_error_cleanup; |
|
2088 } |
|
2089 LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed, |
|
2090 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); |
|
2091 initmsg.sinit_num_ostreams = total_needed; |
|
2092 initmsg.sinit_max_instreams = MAX_NUM_STREAMS; |
|
2093 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, |
|
2094 (socklen_t)sizeof(initmsg)) < 0) { |
|
2095 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); |
|
2096 goto request_error_cleanup; |
|
2097 } |
|
2098 |
|
2099 int32_t old_len = mStreams.Length(); |
|
2100 mStreams.AppendElements(total_needed - old_len); |
|
2101 for (int32_t i = old_len; i < total_needed; ++i) { |
|
2102 mStreams[i] = nullptr; |
|
2103 } |
|
2104 } |
|
2105 // else if state is CONNECTING, we'll just re-negotiate when OpenFinish |
|
2106 // is called, if needed |
|
2107 queue = true; |
|
2108 } |
|
2109 if (queue) { |
|
2110 LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream)); |
|
2111 // Also serves to mark we told the app |
|
2112 channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; |
|
2113 channel->AddRef(); // we need a ref for the nsDeQue and one to return |
|
2114 mPending.Push(channel); |
|
2115 return channel.forget(); |
|
2116 } |
|
2117 |
|
2118 MOZ_ASSERT(stream != INVALID_STREAM); |
|
2119 // just allocated (& OPEN), or externally negotiated |
|
2120 mStreams[stream] = channel; // holds a reference |
|
2121 channel->mStream = stream; |
|
2122 |
|
2123 #ifdef TEST_QUEUED_DATA |
|
2124 // It's painful to write a test for this... |
|
2125 channel->mState = OPEN; |
|
2126 channel->mReady = true; |
|
2127 SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST); |
|
2128 #endif |
|
2129 |
|
2130 if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) { |
|
2131 // Don't send unordered until this gets cleared |
|
2132 channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK; |
|
2133 } |
|
2134 |
|
2135 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { |
|
2136 if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol, |
|
2137 stream, |
|
2138 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), |
|
2139 channel->mPrPolicy, channel->mPrValue)) { |
|
2140 LOG(("SendOpenRequest failed, errno = %d", errno)); |
|
2141 if (errno == EAGAIN || errno == EWOULDBLOCK) { |
|
2142 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; |
|
2143 StartDefer(); |
|
2144 |
|
2145 return channel.forget(); |
|
2146 } else { |
|
2147 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { |
|
2148 // We already returned the channel to the app. |
|
2149 NS_ERROR("Failed to send open request"); |
|
2150 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
2151 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
|
2152 channel)); |
|
2153 } |
|
2154 // If we haven't returned the channel yet, it will get destroyed when we exit |
|
2155 // this function. |
|
2156 mStreams[stream] = nullptr; |
|
2157 channel->mStream = INVALID_STREAM; |
|
2158 // we'll be destroying the channel |
|
2159 channel->mState = CLOSED; |
|
2160 return nullptr; |
|
2161 } |
|
2162 /* NOTREACHED */ |
|
2163 } |
|
2164 } |
|
2165 // Either externally negotiated or we sent Open |
|
2166 channel->mState = OPEN; |
|
2167 channel->mReady = true; |
|
2168 // FIX? Move into DOMDataChannel? I don't think we can send it yet here |
|
2169 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); |
|
2170 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
2171 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, |
|
2172 channel)); |
|
2173 |
|
2174 return channel.forget(); |
|
2175 |
|
2176 request_error_cleanup: |
|
2177 channel->mState = CLOSED; |
|
2178 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { |
|
2179 // We already returned the channel to the app. |
|
2180 NS_ERROR("Failed to request more streams"); |
|
2181 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
2182 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, |
|
2183 channel)); |
|
2184 return channel.forget(); |
|
2185 } |
|
2186 // we'll be destroying the channel, but it never really got set up |
|
2187 // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and |
|
2188 // Dispatch it to ourselves |
|
2189 return nullptr; |
|
2190 } |
|
2191 |
|
2192 int32_t |
|
2193 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data, |
|
2194 uint32_t length, uint32_t ppid) |
|
2195 { |
|
2196 uint16_t flags; |
|
2197 struct sctp_sendv_spa spa; |
|
2198 int32_t result; |
|
2199 |
|
2200 NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0); |
|
2201 NS_WARN_IF_FALSE(length > 0, "Length is 0?!"); |
|
2202 |
|
2203 // To avoid problems where an in-order OPEN is lost and an |
|
2204 // out-of-order data message "beats" it, require data to be in-order |
|
2205 // until we get an ACK. |
|
2206 if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && |
|
2207 !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) { |
|
2208 flags = SCTP_UNORDERED; |
|
2209 } else { |
|
2210 flags = 0; |
|
2211 } |
|
2212 |
|
2213 spa.sendv_sndinfo.snd_ppid = htonl(ppid); |
|
2214 spa.sendv_sndinfo.snd_sid = channel->mStream; |
|
2215 spa.sendv_sndinfo.snd_flags = flags; |
|
2216 spa.sendv_sndinfo.snd_context = 0; |
|
2217 spa.sendv_sndinfo.snd_assoc_id = 0; |
|
2218 spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; |
|
2219 |
|
2220 if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) { |
|
2221 spa.sendv_prinfo.pr_policy = channel->mPrPolicy; |
|
2222 spa.sendv_prinfo.pr_value = channel->mPrValue; |
|
2223 spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; |
|
2224 } |
|
2225 |
|
2226 // Note: Main-thread IO, but doesn't block! |
|
2227 // XXX FIX! to deal with heavy overruns of JS trying to pass data in |
|
2228 // (more than the buffersize) queue data onto another thread to do the |
|
2229 // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp |
|
2230 |
|
2231 // SCTP will return EMSGSIZE if the message is bigger than the buffer |
|
2232 // size (or EAGAIN if there isn't space) |
|
2233 if (channel->mBufferedData.IsEmpty()) { |
|
2234 result = usrsctp_sendv(mSocket, data, length, |
|
2235 nullptr, 0, |
|
2236 (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa), |
|
2237 SCTP_SENDV_SPA, 0); |
|
2238 LOG(("Sent buffer (len=%u), result=%d", length, result)); |
|
2239 } else { |
|
2240 // Fake EAGAIN if we're already buffering data |
|
2241 result = -1; |
|
2242 errno = EAGAIN; |
|
2243 } |
|
2244 if (result < 0) { |
|
2245 if (errno == EAGAIN || errno == EWOULDBLOCK) { |
|
2246 // queue data for resend! And queue any further data for the stream until it is... |
|
2247 BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc |
|
2248 channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array |
|
2249 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA; |
|
2250 LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length)); |
|
2251 StartDefer(); |
|
2252 return 0; |
|
2253 } |
|
2254 LOG(("error %d sending string", errno)); |
|
2255 } |
|
2256 return result; |
|
2257 } |
|
2258 |
|
2259 // Handles fragmenting binary messages |
|
2260 int32_t |
|
2261 DataChannelConnection::SendBinary(DataChannel *channel, const char *data, |
|
2262 uint32_t len, |
|
2263 uint32_t ppid_partial, uint32_t ppid_final) |
|
2264 { |
|
2265 // Since there's a limit on network buffer size and no limits on message |
|
2266 // size, and we don't want to use EOR mode (multiple writes for a |
|
2267 // message, but all other streams are blocked until you finish sending |
|
2268 // this message), we need to add application-level fragmentation of large |
|
2269 // messages. On a reliable channel, these can be simply rebuilt into a |
|
2270 // large message. On an unreliable channel, we can't and don't know how |
|
2271 // long to wait, and there are no retransmissions, and no easy way to |
|
2272 // tell the user "this part is missing", so on unreliable channels we |
|
2273 // need to return an error if sending more bytes than the network buffers |
|
2274 // can hold, and perhaps a lower number. |
|
2275 |
|
2276 // We *really* don't want to do this from main thread! - and SendMsgInternal |
|
2277 // avoids blocking. |
|
2278 // This MUST be reliable and in-order for the reassembly to work |
|
2279 if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT && |
|
2280 channel->mPrPolicy == DATA_CHANNEL_RELIABLE && |
|
2281 !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) { |
|
2282 int32_t sent=0; |
|
2283 uint32_t origlen = len; |
|
2284 LOG(("Sending binary message length %u in chunks", len)); |
|
2285 // XXX check flags for out-of-order, or force in-order for large binary messages |
|
2286 while (len > 0) { |
|
2287 uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT); |
|
2288 uint32_t ppid; |
|
2289 len -= sendlen; |
|
2290 ppid = len > 0 ? ppid_partial : ppid_final; |
|
2291 LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid)); |
|
2292 // Note that these might end up being deferred and queued. |
|
2293 sent += SendMsgInternal(channel, data, sendlen, ppid); |
|
2294 data += sendlen; |
|
2295 } |
|
2296 LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued", |
|
2297 (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT, |
|
2298 origlen, sent, |
|
2299 channel->mBufferedData.Length())); |
|
2300 return sent; |
|
2301 } |
|
2302 NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT, |
|
2303 "Sending too-large data on unreliable channel!"); |
|
2304 |
|
2305 // This will fail if the message is too large (default 256K) |
|
2306 return SendMsgInternal(channel, data, len, ppid_final); |
|
2307 } |
|
2308 |
|
2309 class ReadBlobRunnable : public nsRunnable { |
|
2310 public: |
|
2311 ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream, |
|
2312 nsIInputStream* aBlob) : |
|
2313 mConnection(aConnection), |
|
2314 mStream(aStream), |
|
2315 mBlob(aBlob) |
|
2316 { } |
|
2317 |
|
2318 NS_IMETHODIMP Run() { |
|
2319 // ReadBlob() is responsible to releasing the reference |
|
2320 DataChannelConnection *self = mConnection; |
|
2321 self->ReadBlob(mConnection.forget(), mStream, mBlob); |
|
2322 return NS_OK; |
|
2323 } |
|
2324 |
|
2325 private: |
|
2326 // Make sure the Connection doesn't die while there are jobs outstanding. |
|
2327 // Let it die (if released by PeerConnectionImpl while we're running) |
|
2328 // when we send our runnable back to MainThread. Then ~DataChannelConnection |
|
2329 // can send the IOThread to MainThread to die in a runnable, avoiding |
|
2330 // unsafe event loop recursion. Evil. |
|
2331 nsRefPtr<DataChannelConnection> mConnection; |
|
2332 uint16_t mStream; |
|
2333 // Use RefCount for preventing the object is deleted when SendBlob returns. |
|
2334 nsRefPtr<nsIInputStream> mBlob; |
|
2335 }; |
|
2336 |
|
2337 int32_t |
|
2338 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) |
|
2339 { |
|
2340 DataChannel *channel = mStreams[stream]; |
|
2341 NS_ENSURE_TRUE(channel, 0); |
|
2342 // Spawn a thread to send the data |
|
2343 if (!mInternalIOThread) { |
|
2344 nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread)); |
|
2345 if (NS_FAILED(res)) { |
|
2346 return -1; |
|
2347 } |
|
2348 } |
|
2349 |
|
2350 nsCOMPtr<nsIRunnable> runnable = new ReadBlobRunnable(this, stream, aBlob); |
|
2351 mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL); |
|
2352 return 0; |
|
2353 } |
|
2354 |
|
2355 void |
|
2356 DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis, |
|
2357 uint16_t aStream, nsIInputStream* aBlob) |
|
2358 { |
|
2359 // NOTE: 'aThis' has been forgotten by the caller to avoid releasing |
|
2360 // it off mainthread; if PeerConnectionImpl has released then we want |
|
2361 // ~DataChannelConnection() to run on MainThread |
|
2362 |
|
2363 // XXX to do this safely, we must enqueue these atomically onto the |
|
2364 // output socket. We need a sender thread(s?) to enque data into the |
|
2365 // socket and to avoid main-thread IO that might block. Even on a |
|
2366 // background thread, we may not want to block on one stream's data. |
|
2367 // I.e. run non-blocking and service multiple channels. |
|
2368 |
|
2369 // For now as a hack, send as a single blast of queued packets which may |
|
2370 // be deferred until buffer space is available. |
|
2371 nsCString temp; |
|
2372 uint64_t len; |
|
2373 nsCOMPtr<nsIThread> mainThread; |
|
2374 NS_GetMainThread(getter_AddRefs(mainThread)); |
|
2375 |
|
2376 if (NS_FAILED(aBlob->Available(&len)) || |
|
2377 NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) { |
|
2378 // Bug 966602: Doesn't return an error to the caller via onerror. |
|
2379 // We must release DataChannelConnection on MainThread to avoid issues (bug 876167) |
|
2380 NS_ProxyRelease(mainThread, aThis.take()); |
|
2381 return; |
|
2382 } |
|
2383 aBlob->Close(); |
|
2384 RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr<DataChannelConnection>(aThis), |
|
2385 &DataChannelConnection::SendBinaryMsg, |
|
2386 aStream, temp), |
|
2387 NS_DISPATCH_NORMAL); |
|
2388 } |
|
2389 |
|
2390 void |
|
2391 DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList) |
|
2392 { |
|
2393 ASSERT_WEBRTC(NS_IsMainThread()); |
|
2394 for (uint32_t i = 0; i < mStreams.Length(); ++i) { |
|
2395 if (mStreams[i]) { |
|
2396 aStreamList->push_back(mStreams[i]->mStream); |
|
2397 } |
|
2398 } |
|
2399 } |
|
2400 |
|
2401 int32_t |
|
2402 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, |
|
2403 bool isBinary) |
|
2404 { |
|
2405 ASSERT_WEBRTC(NS_IsMainThread()); |
|
2406 // We really could allow this from other threads, so long as we deal with |
|
2407 // asynchronosity issues with channels closing, in particular access to |
|
2408 // mStreams, and issues with the association closing (access to mSocket). |
|
2409 |
|
2410 const char *data = aMsg.BeginReading(); |
|
2411 uint32_t len = aMsg.Length(); |
|
2412 DataChannel *channel; |
|
2413 |
|
2414 LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len)); |
|
2415 // XXX if we want more efficiency, translate flags once at open time |
|
2416 channel = mStreams[stream]; |
|
2417 NS_ENSURE_TRUE(channel, 0); |
|
2418 |
|
2419 if (isBinary) |
|
2420 return SendBinary(channel, data, len, |
|
2421 DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST); |
|
2422 return SendBinary(channel, data, len, |
|
2423 DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST); |
|
2424 } |
|
2425 |
|
2426 void |
|
2427 DataChannelConnection::Close(DataChannel *aChannel) |
|
2428 { |
|
2429 MutexAutoLock lock(mLock); |
|
2430 CloseInt(aChannel); |
|
2431 } |
|
2432 |
|
2433 // So we can call Close() with the lock already held |
|
2434 // Called from someone who holds a ref via ::Close(), or from ~DataChannel |
|
2435 void |
|
2436 DataChannelConnection::CloseInt(DataChannel *aChannel) |
|
2437 { |
|
2438 MOZ_ASSERT(aChannel); |
|
2439 nsRefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us |
|
2440 |
|
2441 mLock.AssertCurrentThreadOwns(); |
|
2442 LOG(("Connection %p/Channel %p: Closing stream %u", |
|
2443 channel->mConnection.get(), channel.get(), channel->mStream)); |
|
2444 // re-test since it may have closed before the lock was grabbed |
|
2445 if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) { |
|
2446 LOG(("Channel already closing/closed (%u)", aChannel->mState)); |
|
2447 if (mState == CLOSED && channel->mStream != INVALID_STREAM) { |
|
2448 // called from CloseAll() |
|
2449 // we're not going to hang around waiting any more |
|
2450 mStreams[channel->mStream] = nullptr; |
|
2451 } |
|
2452 return; |
|
2453 } |
|
2454 aChannel->mBufferedData.Clear(); |
|
2455 if (channel->mStream != INVALID_STREAM) { |
|
2456 ResetOutgoingStream(channel->mStream); |
|
2457 if (mState == CLOSED) { // called from CloseAll() |
|
2458 // Let resets accumulate then send all at once in CloseAll() |
|
2459 // we're not going to hang around waiting |
|
2460 mStreams[channel->mStream] = nullptr; |
|
2461 } else { |
|
2462 SendOutgoingStreamReset(); |
|
2463 } |
|
2464 } |
|
2465 aChannel->mState = CLOSING; |
|
2466 if (mState == CLOSED) { |
|
2467 // we're not going to hang around waiting |
|
2468 channel->Destroy(); |
|
2469 } |
|
2470 // At this point when we leave here, the object is a zombie held alive only by the DOM object |
|
2471 } |
|
2472 |
|
2473 void DataChannelConnection::CloseAll() |
|
2474 { |
|
2475 LOG(("Closing all channels (connection %p)", (void*) this)); |
|
2476 // Don't need to lock here |
|
2477 |
|
2478 // Make sure no more channels will be opened |
|
2479 { |
|
2480 MutexAutoLock lock(mLock); |
|
2481 mState = CLOSED; |
|
2482 } |
|
2483 |
|
2484 // Close current channels |
|
2485 // If there are runnables, they hold a strong ref and keep the channel |
|
2486 // and/or connection alive (even if in a CLOSED state) |
|
2487 bool closed_some = false; |
|
2488 for (uint32_t i = 0; i < mStreams.Length(); ++i) { |
|
2489 if (mStreams[i]) { |
|
2490 mStreams[i]->Close(); |
|
2491 closed_some = true; |
|
2492 } |
|
2493 } |
|
2494 |
|
2495 // Clean up any pending opens for channels |
|
2496 nsRefPtr<DataChannel> channel; |
|
2497 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) { |
|
2498 LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream)); |
|
2499 channel->Close(); // also releases the ref on each iteration |
|
2500 closed_some = true; |
|
2501 } |
|
2502 // It's more efficient to let the Resets queue in shutdown and then |
|
2503 // SendOutgoingStreamReset() here. |
|
2504 if (closed_some) { |
|
2505 MutexAutoLock lock(mLock); |
|
2506 SendOutgoingStreamReset(); |
|
2507 } |
|
2508 } |
|
2509 |
|
2510 DataChannel::~DataChannel() |
|
2511 { |
|
2512 // NS_ASSERTION since this is more "I think I caught all the cases that |
|
2513 // can cause this" than a true kill-the-program assertion. If this is |
|
2514 // wrong, nothing bad happens. A worst it's a leak. |
|
2515 NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel"); |
|
2516 } |
|
2517 |
|
2518 void |
|
2519 DataChannel::Close() |
|
2520 { |
|
2521 ENSURE_DATACONNECTION; |
|
2522 mConnection->Close(this); |
|
2523 } |
|
2524 |
|
2525 // Used when disconnecting from the DataChannelConnection |
|
2526 void |
|
2527 DataChannel::Destroy() |
|
2528 { |
|
2529 ENSURE_DATACONNECTION; |
|
2530 |
|
2531 LOG(("Destroying Data channel %u", mStream)); |
|
2532 MOZ_ASSERT_IF(mStream != INVALID_STREAM, |
|
2533 !mConnection->FindChannelByStream(mStream)); |
|
2534 mStream = INVALID_STREAM; |
|
2535 mState = CLOSED; |
|
2536 mConnection = nullptr; |
|
2537 } |
|
2538 |
|
2539 void |
|
2540 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext) |
|
2541 { |
|
2542 MutexAutoLock mLock(mListenerLock); |
|
2543 mContext = aContext; |
|
2544 mListener = aListener; |
|
2545 } |
|
2546 |
|
2547 // May be called from another (i.e. Main) thread! |
|
2548 void |
|
2549 DataChannel::AppReady() |
|
2550 { |
|
2551 ENSURE_DATACONNECTION; |
|
2552 |
|
2553 MutexAutoLock lock(mConnection->mLock); |
|
2554 |
|
2555 mReady = true; |
|
2556 if (mState == WAITING_TO_OPEN) { |
|
2557 mState = OPEN; |
|
2558 NS_DispatchToMainThread(new DataChannelOnMessageAvailable( |
|
2559 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection, |
|
2560 this)); |
|
2561 for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) { |
|
2562 nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i]; |
|
2563 MOZ_ASSERT(runnable); |
|
2564 NS_DispatchToMainThread(runnable); |
|
2565 } |
|
2566 } else { |
|
2567 NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN"); |
|
2568 } |
|
2569 mQueuedMessages.Clear(); |
|
2570 mQueuedMessages.Compact(); |
|
2571 // We never use it again... We could even allocate the array in the odd |
|
2572 // cases we need it. |
|
2573 } |
|
2574 |
|
2575 uint32_t |
|
2576 DataChannel::GetBufferedAmount() |
|
2577 { |
|
2578 uint32_t buffered = 0; |
|
2579 for (uint32_t i = 0; i < mBufferedData.Length(); ++i) { |
|
2580 buffered += mBufferedData[i]->mLength; |
|
2581 } |
|
2582 return buffered; |
|
2583 } |
|
2584 |
|
2585 // Called with mLock locked! |
|
2586 void |
|
2587 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) |
|
2588 { |
|
2589 if (!mReady && |
|
2590 (mState == CONNECTING || mState == WAITING_TO_OPEN)) { |
|
2591 mQueuedMessages.AppendElement(aMessage); |
|
2592 } else { |
|
2593 NS_DispatchToMainThread(aMessage); |
|
2594 } |
|
2595 } |
|
2596 |
|
2597 } // namespace mozilla |