michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* vim: set ts=2 et sw=2 tw=80: */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this file, michael@0: * You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include michael@0: #include michael@0: #if !defined(__Userspace_os_Windows) michael@0: #include michael@0: #endif michael@0: // usrsctp.h expects to have errno definitions prior to its inclusion. michael@0: #include michael@0: michael@0: #define SCTP_DEBUG 1 michael@0: #define SCTP_STDINT_INCLUDE michael@0: michael@0: #ifdef _MSC_VER michael@0: // Disable "warning C4200: nonstandard extension used : zero-sized array in michael@0: // struct/union" michael@0: // ...which the third-party file usrsctp.h runs afoul of. michael@0: #pragma warning(push) michael@0: #pragma warning(disable:4200) michael@0: #endif michael@0: michael@0: #include "usrsctp.h" michael@0: michael@0: #ifdef _MSC_VER michael@0: #pragma warning(pop) michael@0: #endif michael@0: michael@0: #include "DataChannelLog.h" michael@0: michael@0: #include "nsServiceManagerUtils.h" michael@0: #include "nsIObserverService.h" michael@0: #include "nsIObserver.h" michael@0: #include "mozilla/Services.h" michael@0: #include "nsProxyRelease.h" michael@0: #include "nsThread.h" michael@0: #include "nsThreadUtils.h" michael@0: #include "nsAutoPtr.h" michael@0: #include "nsNetUtil.h" michael@0: #include "mozilla/StaticPtr.h" michael@0: #ifdef MOZ_PEERCONNECTION michael@0: #include "mtransport/runnable_utils.h" michael@0: #endif michael@0: michael@0: #define DATACHANNEL_LOG(args) LOG(args) michael@0: #include "DataChannel.h" michael@0: #include "DataChannelProtocol.h" michael@0: michael@0: #ifdef PR_LOGGING michael@0: PRLogModuleInfo* michael@0: GetDataChannelLog() michael@0: { michael@0: static PRLogModuleInfo* sLog; michael@0: if (!sLog) michael@0: sLog = PR_NewLogModule("DataChannel"); michael@0: return sLog; michael@0: } michael@0: michael@0: PRLogModuleInfo* michael@0: GetSCTPLog() michael@0: { michael@0: static PRLogModuleInfo* sLog; michael@0: if (!sLog) michael@0: sLog = PR_NewLogModule("SCTP"); michael@0: return sLog; michael@0: } michael@0: #endif michael@0: michael@0: // Let us turn on and off important assertions in non-debug builds michael@0: #ifdef DEBUG michael@0: #define ASSERT_WEBRTC(x) MOZ_ASSERT((x)) michael@0: #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS) michael@0: #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0) michael@0: #endif michael@0: michael@0: static bool sctp_initialized; michael@0: michael@0: namespace mozilla { michael@0: michael@0: class DataChannelShutdown; michael@0: StaticRefPtr gDataChannelShutdown; michael@0: michael@0: class DataChannelShutdown : public nsIObserver michael@0: { michael@0: public: michael@0: // This needs to be tied to some form object that is guaranteed to be michael@0: // around (singleton likely) unless we want to shutdown sctp whenever michael@0: // we're not using it (and in which case we'd keep a refcnt'd object michael@0: // ref'd by each DataChannelConnection to release the SCTP usrlib via michael@0: // sctp_finish) michael@0: michael@0: NS_DECL_ISUPPORTS michael@0: michael@0: DataChannelShutdown() {} michael@0: michael@0: void Init() michael@0: { michael@0: nsCOMPtr observerService = michael@0: mozilla::services::GetObserverService(); michael@0: if (!observerService) michael@0: return; michael@0: michael@0: nsresult rv = observerService->AddObserver(this, michael@0: "profile-change-net-teardown", michael@0: false); michael@0: MOZ_ASSERT(rv == NS_OK); michael@0: (void) rv; michael@0: } michael@0: michael@0: virtual ~DataChannelShutdown() michael@0: { michael@0: nsCOMPtr observerService = michael@0: mozilla::services::GetObserverService(); michael@0: if (observerService) michael@0: observerService->RemoveObserver(this, "profile-change-net-teardown"); michael@0: } michael@0: michael@0: NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic, michael@0: const char16_t* aData) { michael@0: if (strcmp(aTopic, "profile-change-net-teardown") == 0) { michael@0: LOG(("Shutting down SCTP")); michael@0: if (sctp_initialized) { michael@0: usrsctp_finish(); michael@0: sctp_initialized = false; michael@0: } michael@0: nsCOMPtr observerService = michael@0: mozilla::services::GetObserverService(); michael@0: if (!observerService) michael@0: return NS_ERROR_FAILURE; michael@0: michael@0: nsresult rv = observerService->RemoveObserver(this, michael@0: "profile-change-net-teardown"); michael@0: MOZ_ASSERT(rv == NS_OK); michael@0: (void) rv; michael@0: michael@0: nsRefPtr kungFuDeathGrip(this); michael@0: gDataChannelShutdown = nullptr; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver); michael@0: michael@0: michael@0: BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data, michael@0: uint32_t length) : mLength(length) michael@0: { michael@0: mSpa = new sctp_sendv_spa; michael@0: *mSpa = spa; michael@0: char *tmp = new char[length]; // infallible malloc! michael@0: memcpy(tmp, data, length); michael@0: mData = tmp; michael@0: } michael@0: michael@0: BufferedMsg::~BufferedMsg() michael@0: { michael@0: delete mSpa; michael@0: delete mData; michael@0: } michael@0: michael@0: static int michael@0: receive_cb(struct socket* sock, union sctp_sockstore addr, michael@0: void *data, size_t datalen, michael@0: struct sctp_rcvinfo rcv, int flags, void *ulp_info) michael@0: { michael@0: DataChannelConnection *connection = static_cast(ulp_info); michael@0: return connection->ReceiveCallback(sock, data, datalen, rcv, flags); michael@0: } michael@0: michael@0: #ifdef PR_LOGGING michael@0: static void michael@0: debug_printf(const char *format, ...) michael@0: { michael@0: va_list ap; michael@0: char buffer[1024]; michael@0: michael@0: if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { michael@0: va_start(ap, format); michael@0: #ifdef _WIN32 michael@0: if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) { michael@0: #else michael@0: if (vsnprintf(buffer, sizeof(buffer), format, ap) > 0) { michael@0: #endif michael@0: PR_LogPrint("%s", buffer); michael@0: } michael@0: va_end(ap); michael@0: } michael@0: } michael@0: #endif michael@0: michael@0: DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) : michael@0: mLock("netwerk::sctp::DataChannelConnection") michael@0: { michael@0: mState = CLOSED; michael@0: mSocket = nullptr; michael@0: mMasterSocket = nullptr; michael@0: mListener = listener->asWeakPtr(); michael@0: mLocalPort = 0; michael@0: mRemotePort = 0; michael@0: mDeferTimeout = 10; michael@0: mTimerRunning = false; michael@0: LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get())); michael@0: mInternalIOThread = nullptr; michael@0: } michael@0: michael@0: DataChannelConnection::~DataChannelConnection() michael@0: { michael@0: LOG(("Deleting DataChannelConnection %p", (void *) this)); michael@0: // This may die on the MainThread, or on the STS thread michael@0: ASSERT_WEBRTC(mState == CLOSED); michael@0: MOZ_ASSERT(!mMasterSocket); michael@0: MOZ_ASSERT(mPending.GetSize() == 0); michael@0: michael@0: // Already disconnected from sigslot/mTransportFlow michael@0: // TransportFlows must be released from the STS thread michael@0: if (!IsSTSThread()) { michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: if (mTransportFlow) { michael@0: ASSERT_WEBRTC(mSTS); michael@0: NS_ProxyRelease(mSTS, mTransportFlow); michael@0: } michael@0: michael@0: if (mInternalIOThread) { michael@0: // Avoid spinning the event thread from here (which if we're mainthread michael@0: // is in the event loop already) michael@0: NS_DispatchToMainThread(WrapRunnable(nsCOMPtr(mInternalIOThread), michael@0: &nsIThread::Shutdown), michael@0: NS_DISPATCH_NORMAL); michael@0: } michael@0: } else { michael@0: // on STS, safe to call shutdown michael@0: if (mInternalIOThread) { michael@0: mInternalIOThread->Shutdown(); michael@0: } michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::Destroy() michael@0: { michael@0: // Though it's probably ok to do this and close the sockets; michael@0: // if we really want it to do true clean shutdowns it can michael@0: // create a dependant Internal object that would remain around michael@0: // until the network shut down the association or timed out. michael@0: LOG(("Destroying DataChannelConnection %p", (void *) this)); michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: CloseAll(); michael@0: michael@0: MutexAutoLock lock(mLock); michael@0: // If we had a pending reset, we aren't waiting for it - clear the list so michael@0: // we can deregister this DataChannelConnection without leaking. michael@0: ClearResets(); michael@0: michael@0: MOZ_ASSERT(mSTS); michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed, michael@0: // the usrsctp_close() calls can move back here (and just proxy the michael@0: // disconnect_all()) michael@0: RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr(this), michael@0: &DataChannelConnection::DestroyOnSTS, michael@0: mSocket, mMasterSocket), michael@0: NS_DISPATCH_NORMAL); michael@0: michael@0: // These will be released on STS michael@0: mSocket = nullptr; michael@0: mMasterSocket = nullptr; // also a flag that we've Destroyed this connection michael@0: michael@0: // Must do this in Destroy() since we may then delete this object michael@0: if (mUsingDtls) { michael@0: usrsctp_deregister_address(static_cast(this)); michael@0: LOG(("Deregistered %p from the SCTP stack.", static_cast(this))); michael@0: } michael@0: michael@0: // We can't get any more new callbacks from the SCTP library michael@0: // All existing callbacks have refs to DataChannelConnection michael@0: michael@0: // nsDOMDataChannel objects have refs to DataChannels that have refs to us michael@0: } michael@0: michael@0: void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket, michael@0: struct socket *aSocket) michael@0: { michael@0: if (aSocket && aSocket != aMasterSocket) michael@0: usrsctp_close(aSocket); michael@0: if (aMasterSocket) michael@0: usrsctp_close(aMasterSocket); michael@0: michael@0: disconnect_all(); michael@0: } michael@0: michael@0: NS_IMPL_ISUPPORTS(DataChannelConnection, michael@0: nsITimerCallback) michael@0: michael@0: bool michael@0: DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls) michael@0: { michael@0: struct sctp_initmsg initmsg; michael@0: struct sctp_udpencaps encaps; michael@0: struct sctp_assoc_value av; michael@0: struct sctp_event event; michael@0: socklen_t len; michael@0: michael@0: uint16_t event_types[] = {SCTP_ASSOC_CHANGE, michael@0: SCTP_PEER_ADDR_CHANGE, michael@0: SCTP_REMOTE_ERROR, michael@0: SCTP_SHUTDOWN_EVENT, michael@0: SCTP_ADAPTATION_INDICATION, michael@0: SCTP_SEND_FAILED_EVENT, michael@0: SCTP_STREAM_RESET_EVENT, michael@0: SCTP_STREAM_CHANGE_EVENT}; michael@0: { michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: michael@0: // MutexAutoLock lock(mLock); Not needed since we're on mainthread always michael@0: if (!sctp_initialized) { michael@0: if (aUsingDtls) { michael@0: LOG(("sctp_init(DTLS)")); michael@0: #ifdef MOZ_PEERCONNECTION michael@0: usrsctp_init(0, michael@0: DataChannelConnection::SctpDtlsOutput, michael@0: #ifdef PR_LOGGING michael@0: debug_printf michael@0: #else michael@0: nullptr michael@0: #endif michael@0: ); michael@0: #else michael@0: NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport"); michael@0: #endif michael@0: } else { michael@0: LOG(("sctp_init(%u)", aPort)); michael@0: usrsctp_init(aPort, michael@0: nullptr, michael@0: #ifdef PR_LOGGING michael@0: debug_printf michael@0: #else michael@0: nullptr michael@0: #endif michael@0: ); michael@0: } michael@0: michael@0: #ifdef PR_LOGGING michael@0: // Set logging to SCTP:PR_LOG_DEBUG to get SCTP debugs michael@0: if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_ALWAYS)) { michael@0: usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); michael@0: } michael@0: #endif michael@0: usrsctp_sysctl_set_sctp_blackhole(2); michael@0: // ECN is currently not supported by the Firefox code michael@0: usrsctp_sysctl_set_sctp_ecn_enable(0); michael@0: sctp_initialized = true; michael@0: michael@0: gDataChannelShutdown = new DataChannelShutdown(); michael@0: gDataChannelShutdown->Init(); michael@0: } michael@0: } michael@0: michael@0: // XXX FIX! make this a global we get once michael@0: // Find the STS thread michael@0: nsresult rv; michael@0: mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); michael@0: MOZ_ASSERT(NS_SUCCEEDED(rv)); michael@0: michael@0: // Open sctp with a callback michael@0: if ((mMasterSocket = usrsctp_socket( michael@0: aUsingDtls ? AF_CONN : AF_INET, michael@0: SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) { michael@0: return false; michael@0: } michael@0: michael@0: // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking michael@0: // in associations for normal IO michael@0: if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) { michael@0: LOG(("Couldn't set non_blocking on SCTP socket")); michael@0: // We can't handle connect() safely if it will block, not that this will michael@0: // even happen. michael@0: goto error_cleanup; michael@0: } michael@0: michael@0: // Make sure when we close the socket, make sure it doesn't call us back again! michael@0: // This would cause it try to use an invalid DataChannelConnection pointer michael@0: struct linger l; michael@0: l.l_onoff = 1; michael@0: l.l_linger = 0; michael@0: if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, michael@0: (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { michael@0: LOG(("Couldn't set SO_LINGER on SCTP socket")); michael@0: // unsafe to allow it to continue if this fails michael@0: goto error_cleanup; michael@0: } michael@0: michael@0: // XXX Consider disabling this when we add proper SDP negotiation. michael@0: // We may want to leave enabled for supporting 'cloning' of SDP offers, which michael@0: // implies re-use of the same pseudo-port number, or forcing a renegotiation. michael@0: { michael@0: uint32_t on = 1; michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT, michael@0: (const void *)&on, (socklen_t)sizeof(on)) < 0) { michael@0: LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket")); michael@0: } michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY, michael@0: (const void *)&on, (socklen_t)sizeof(on)) < 0) { michael@0: LOG(("Couldn't set SCTP_NODELAY on SCTP socket")); michael@0: } michael@0: } michael@0: michael@0: if (!aUsingDtls) { michael@0: memset(&encaps, 0, sizeof(encaps)); michael@0: encaps.sue_address.ss_family = AF_INET; michael@0: encaps.sue_port = htons(aPort); michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT, michael@0: (const void*)&encaps, michael@0: (socklen_t)sizeof(struct sctp_udpencaps)) < 0) { michael@0: LOG(("*** failed encaps errno %d", errno)); michael@0: goto error_cleanup; michael@0: } michael@0: LOG(("SCTP encapsulation local port %d", aPort)); michael@0: } michael@0: michael@0: av.assoc_id = SCTP_ALL_ASSOC; michael@0: av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, michael@0: (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { michael@0: LOG(("*** failed enable stream reset errno %d", errno)); michael@0: goto error_cleanup; michael@0: } michael@0: michael@0: /* Enable the events of interest. */ michael@0: memset(&event, 0, sizeof(event)); michael@0: event.se_assoc_id = SCTP_ALL_ASSOC; michael@0: event.se_on = 1; michael@0: for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) { michael@0: event.se_type = event_types[i]; michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) { michael@0: LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno)); michael@0: goto error_cleanup; michael@0: } michael@0: } michael@0: michael@0: // Update number of streams michael@0: mStreams.AppendElements(aNumStreams); michael@0: for (uint32_t i = 0; i < aNumStreams; ++i) { michael@0: mStreams[i] = nullptr; michael@0: } michael@0: memset(&initmsg, 0, sizeof(initmsg)); michael@0: len = sizeof(initmsg); michael@0: if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { michael@0: LOG(("*** failed getsockopt SCTP_INITMSG")); michael@0: goto error_cleanup; michael@0: } michael@0: LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, michael@0: initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); michael@0: initmsg.sinit_num_ostreams = aNumStreams; michael@0: initmsg.sinit_max_instreams = MAX_NUM_STREAMS; michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, michael@0: (socklen_t)sizeof(initmsg)) < 0) { michael@0: LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); michael@0: goto error_cleanup; michael@0: } michael@0: michael@0: mSocket = nullptr; michael@0: if (aUsingDtls) { michael@0: mUsingDtls = true; michael@0: usrsctp_register_address(static_cast(this)); michael@0: LOG(("Registered %p within the SCTP stack.", static_cast(this))); michael@0: } else { michael@0: mUsingDtls = false; michael@0: } michael@0: return true; michael@0: michael@0: error_cleanup: michael@0: usrsctp_close(mMasterSocket); michael@0: mMasterSocket = nullptr; michael@0: mUsingDtls = false; michael@0: return false; michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::StartDefer() michael@0: { michael@0: nsresult rv; michael@0: if (!NS_IsMainThread()) { michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::START_DEFER, michael@0: this, (DataChannel *) nullptr)); michael@0: return; michael@0: } michael@0: michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: if (!mDeferredTimer) { michael@0: mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); michael@0: MOZ_ASSERT(mDeferredTimer); michael@0: } michael@0: michael@0: if (!mTimerRunning) { michael@0: rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: NS_ENSURE_TRUE_VOID(rv == NS_OK); michael@0: michael@0: mTimerRunning = true; michael@0: } michael@0: } michael@0: michael@0: // nsITimerCallback michael@0: michael@0: NS_IMETHODIMP michael@0: DataChannelConnection::Notify(nsITimer *timer) michael@0: { michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout)); michael@0: michael@0: if (timer == mDeferredTimer) { michael@0: if (SendDeferredMessages()) { michael@0: // Still blocked michael@0: // we don't need a lock, since this must be main thread... michael@0: nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("%s: cannot initialize open timer", __FUNCTION__)); michael@0: // XXX and do....? michael@0: return rv; michael@0: } michael@0: mTimerRunning = true; michael@0: } else { michael@0: LOG(("Turned off deferred send timer")); michael@0: mTimerRunning = false; michael@0: } michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: #ifdef MOZ_PEERCONNECTION michael@0: void michael@0: DataChannelConnection::SetEvenOdd() michael@0: { michael@0: ASSERT_WEBRTC(IsSTSThread()); michael@0: michael@0: TransportLayerDtls *dtls = static_cast( michael@0: mTransportFlow->GetLayer(TransportLayerDtls::ID())); michael@0: MOZ_ASSERT(dtls); // DTLS is mandatory michael@0: mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT); michael@0: } michael@0: michael@0: bool michael@0: DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport) michael@0: { michael@0: LOG(("Connect DTLS local %u, remote %u", localport, remoteport)); michael@0: michael@0: NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!"); michael@0: NS_ENSURE_TRUE(aFlow, false); michael@0: michael@0: mTransportFlow = aFlow; michael@0: mLocalPort = localport; michael@0: mRemotePort = remoteport; michael@0: mState = CONNECTING; michael@0: michael@0: RUN_ON_THREAD(mSTS, WrapRunnable(nsRefPtr(this), michael@0: &DataChannelConnection::SetSignals), michael@0: NS_DISPATCH_NORMAL); michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::SetSignals() michael@0: { michael@0: ASSERT_WEBRTC(IsSTSThread()); michael@0: ASSERT_WEBRTC(mTransportFlow); michael@0: LOG(("Setting transport signals, state: %d", mTransportFlow->state())); michael@0: mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput); michael@0: // SignalStateChange() doesn't call you with the initial state michael@0: mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect); michael@0: CompleteConnect(mTransportFlow, mTransportFlow->state()); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state) michael@0: { michael@0: LOG(("Data transport state: %d", state)); michael@0: MutexAutoLock lock(mLock); michael@0: ASSERT_WEBRTC(IsSTSThread()); michael@0: // We should abort connection on TS_ERROR. michael@0: // Note however that the association will also fail (perhaps with a delay) and michael@0: // notify us in that way michael@0: if (state != TransportLayer::TS_OPEN || !mMasterSocket) michael@0: return; michael@0: michael@0: struct sockaddr_conn addr; michael@0: memset(&addr, 0, sizeof(addr)); michael@0: addr.sconn_family = AF_CONN; michael@0: #if defined(__Userspace_os_Darwin) michael@0: addr.sconn_len = sizeof(addr); michael@0: #endif michael@0: addr.sconn_port = htons(mLocalPort); michael@0: addr.sconn_addr = static_cast(this); michael@0: michael@0: LOG(("Calling usrsctp_bind")); michael@0: int r = usrsctp_bind(mMasterSocket, reinterpret_cast(&addr), michael@0: sizeof(addr)); michael@0: if (r < 0) { michael@0: LOG(("usrsctp_bind failed: %d", r)); michael@0: } else { michael@0: // This is the remote addr michael@0: addr.sconn_port = htons(mRemotePort); michael@0: LOG(("Calling usrsctp_connect")); michael@0: r = usrsctp_connect(mMasterSocket, reinterpret_cast(&addr), michael@0: sizeof(addr)); michael@0: if (r < 0) { michael@0: if (errno == EINPROGRESS) { michael@0: // non-blocking michael@0: return; michael@0: } else { michael@0: LOG(("usrsctp_connect failed: %d", errno)); michael@0: mState = CLOSED; michael@0: } michael@0: } else { michael@0: // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that michael@0: // This also avoids issues with calling TransportFlow stuff on Mainthread michael@0: return; michael@0: } michael@0: } michael@0: // Note: currently this doesn't actually notify the application michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CONNECTION, michael@0: this, false)); michael@0: return; michael@0: } michael@0: michael@0: // Process any pending Opens michael@0: void michael@0: DataChannelConnection::ProcessQueuedOpens() michael@0: { michael@0: // The nsDeque holds channels with an AddRef applied. Another reference michael@0: // (may) be held by the DOMDataChannel, unless it's been GC'd. No other michael@0: // references should exist. michael@0: michael@0: // Can't copy nsDeque's. Move into temp array since any that fail will michael@0: // go back to mPending michael@0: nsDeque temp; michael@0: DataChannel *temp_channel; // really already_AddRefed<> michael@0: while (nullptr != (temp_channel = static_cast(mPending.PopFront()))) { michael@0: temp.Push(static_cast(temp_channel)); michael@0: } michael@0: michael@0: nsRefPtr channel; michael@0: // All these entries have an AddRef(); make that explicit now via the dont_AddRef() michael@0: while (nullptr != (channel = dont_AddRef(static_cast(temp.PopFront())))) { michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { michael@0: LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream)); michael@0: channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; michael@0: // OpenFinish returns a reference itself, so we need to take it can Release it michael@0: channel = OpenFinish(channel.forget()); // may reset the flag and re-push michael@0: } else { michael@0: NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?"); michael@0: } michael@0: } michael@0: michael@0: } michael@0: void michael@0: DataChannelConnection::SctpDtlsInput(TransportFlow *flow, michael@0: const unsigned char *data, size_t len) michael@0: { michael@0: #ifdef PR_LOGGING michael@0: if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { michael@0: char *buf; michael@0: michael@0: if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) { michael@0: PR_LogPrint("%s", buf); michael@0: usrsctp_freedumpbuffer(buf); michael@0: } michael@0: } michael@0: #endif michael@0: // Pass the data to SCTP michael@0: usrsctp_conninput(static_cast(this), data, len, 0); michael@0: } michael@0: michael@0: int michael@0: DataChannelConnection::SendPacket(const unsigned char *data, size_t len, bool release) michael@0: { michael@0: //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len)); michael@0: int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0; michael@0: if (release) michael@0: delete data; michael@0: return res; michael@0: } michael@0: michael@0: /* static */ michael@0: int michael@0: DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length, michael@0: uint8_t tos, uint8_t set_df) michael@0: { michael@0: DataChannelConnection *peer = static_cast(addr); michael@0: int res; michael@0: michael@0: #ifdef PR_LOGGING michael@0: if (PR_LOG_TEST(GetSCTPLog(), PR_LOG_DEBUG)) { michael@0: char *buf; michael@0: michael@0: if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) { michael@0: PR_LogPrint("%s", buf); michael@0: usrsctp_freedumpbuffer(buf); michael@0: } michael@0: } michael@0: #endif michael@0: // We're async proxying even if on the STSThread because this is called michael@0: // with internal SCTP locks held in some cases (such as in usrsctp_connect()). michael@0: // SCTP has an option for Apple, on IP connections only, to release at least michael@0: // one of the locks before calling a packet output routine; with changes to michael@0: // the underlying SCTP stack this might remove the need to use an async proxy. michael@0: if (0 /*peer->IsSTSThread()*/) { michael@0: res = peer->SendPacket(static_cast(buffer), length, false); michael@0: } else { michael@0: unsigned char *data = new unsigned char[length]; michael@0: memcpy(data, buffer, length); michael@0: res = -1; michael@0: // XXX It might be worthwhile to add an assertion against the thread michael@0: // somehow getting into the DataChannel/SCTP code again, as michael@0: // DISPATCH_SYNC is not fully blocking. This may be tricky, as it michael@0: // needs to be a per-thread check, not a global. michael@0: peer->mSTS->Dispatch(WrapRunnable( michael@0: nsRefPtr(peer), michael@0: &DataChannelConnection::SendPacket, data, length, true), michael@0: NS_DISPATCH_NORMAL); michael@0: res = 0; // cheat! Packets can always be dropped later anyways michael@0: } michael@0: return res; michael@0: } michael@0: #endif michael@0: michael@0: #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT michael@0: // listen for incoming associations michael@0: // Blocks! - Don't call this from main thread! michael@0: michael@0: #error This code will not work as-is since SetEvenOdd() runs on Mainthread michael@0: michael@0: bool michael@0: DataChannelConnection::Listen(unsigned short port) michael@0: { michael@0: struct sockaddr_in addr; michael@0: socklen_t addr_len; michael@0: michael@0: NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); michael@0: michael@0: /* Acting as the 'server' */ michael@0: memset((void *)&addr, 0, sizeof(addr)); michael@0: #ifdef HAVE_SIN_LEN michael@0: addr.sin_len = sizeof(struct sockaddr_in); michael@0: #endif michael@0: addr.sin_family = AF_INET; michael@0: addr.sin_port = htons(port); michael@0: addr.sin_addr.s_addr = htonl(INADDR_ANY); michael@0: LOG(("Waiting for connections on port %u", ntohs(addr.sin_port))); michael@0: mState = CONNECTING; michael@0: if (usrsctp_bind(mMasterSocket, reinterpret_cast(&addr), sizeof(struct sockaddr_in)) < 0) { michael@0: LOG(("***Failed userspace_bind")); michael@0: return false; michael@0: } michael@0: if (usrsctp_listen(mMasterSocket, 1) < 0) { michael@0: LOG(("***Failed userspace_listen")); michael@0: return false; michael@0: } michael@0: michael@0: LOG(("Accepting connection")); michael@0: addr_len = 0; michael@0: if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) { michael@0: LOG(("***Failed accept")); michael@0: return false; michael@0: } michael@0: mState = OPEN; michael@0: michael@0: struct linger l; michael@0: l.l_onoff = 1; michael@0: l.l_linger = 0; michael@0: if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, michael@0: (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) { michael@0: LOG(("Couldn't set SO_LINGER on SCTP socket")); michael@0: } michael@0: michael@0: SetEvenOdd(); michael@0: michael@0: // Notify Connection open michael@0: // XXX We need to make sure connection sticks around until the message is delivered michael@0: LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CONNECTION, michael@0: this, (DataChannel *) nullptr)); michael@0: return true; michael@0: } michael@0: michael@0: // Blocks! - Don't call this from main thread! michael@0: bool michael@0: DataChannelConnection::Connect(const char *addr, unsigned short port) michael@0: { michael@0: struct sockaddr_in addr4; michael@0: struct sockaddr_in6 addr6; michael@0: michael@0: NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); michael@0: michael@0: /* Acting as the connector */ michael@0: LOG(("Connecting to %s, port %u", addr, port)); michael@0: memset((void *)&addr4, 0, sizeof(struct sockaddr_in)); michael@0: memset((void *)&addr6, 0, sizeof(struct sockaddr_in6)); michael@0: #ifdef HAVE_SIN_LEN michael@0: addr4.sin_len = sizeof(struct sockaddr_in); michael@0: #endif michael@0: #ifdef HAVE_SIN6_LEN michael@0: addr6.sin6_len = sizeof(struct sockaddr_in6); michael@0: #endif michael@0: addr4.sin_family = AF_INET; michael@0: addr6.sin6_family = AF_INET6; michael@0: addr4.sin_port = htons(port); michael@0: addr6.sin6_port = htons(port); michael@0: mState = CONNECTING; michael@0: michael@0: #if !defined(__Userspace_os_Windows) michael@0: if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { michael@0: if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr6), sizeof(struct sockaddr_in6)) < 0) { michael@0: LOG(("*** Failed userspace_connect")); michael@0: return false; michael@0: } michael@0: } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { michael@0: if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr4), sizeof(struct sockaddr_in)) < 0) { michael@0: LOG(("*** Failed userspace_connect")); michael@0: return false; michael@0: } michael@0: } else { michael@0: LOG(("*** Illegal destination address.")); michael@0: } michael@0: #else michael@0: { michael@0: struct sockaddr_storage ss; michael@0: int sslen = sizeof(ss); michael@0: michael@0: if (!WSAStringToAddressA(const_cast(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) { michael@0: addr6.sin6_addr = (reinterpret_cast(&ss))->sin6_addr; michael@0: if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr6), sizeof(struct sockaddr_in6)) < 0) { michael@0: LOG(("*** Failed userspace_connect")); michael@0: return false; michael@0: } michael@0: } else if (!WSAStringToAddressA(const_cast(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) { michael@0: addr4.sin_addr = (reinterpret_cast(&ss))->sin_addr; michael@0: if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr4), sizeof(struct sockaddr_in)) < 0) { michael@0: LOG(("*** Failed userspace_connect")); michael@0: return false; michael@0: } michael@0: } else { michael@0: LOG(("*** Illegal destination address.")); michael@0: } michael@0: } michael@0: #endif michael@0: michael@0: mSocket = mMasterSocket; michael@0: michael@0: LOG(("connect() succeeded! Entering connected mode")); michael@0: mState = OPEN; michael@0: michael@0: SetEvenOdd(); michael@0: michael@0: // Notify Connection open michael@0: // XXX We need to make sure connection sticks around until the message is delivered michael@0: LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CONNECTION, michael@0: this, (DataChannel *) nullptr)); michael@0: return true; michael@0: } michael@0: #endif michael@0: michael@0: DataChannel * michael@0: DataChannelConnection::FindChannelByStream(uint16_t stream) michael@0: { michael@0: return mStreams.SafeElementAt(stream); michael@0: } michael@0: michael@0: uint16_t michael@0: DataChannelConnection::FindFreeStream() michael@0: { michael@0: uint32_t i, j, limit; michael@0: michael@0: limit = mStreams.Length(); michael@0: if (limit > MAX_NUM_STREAMS) michael@0: limit = MAX_NUM_STREAMS; michael@0: michael@0: for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) { michael@0: if (!mStreams[i]) { michael@0: // Verify it's not still in the process of closing michael@0: for (j = 0; j < mStreamsResetting.Length(); ++j) { michael@0: if (mStreamsResetting[j] == i) { michael@0: break; michael@0: } michael@0: } michael@0: if (j == mStreamsResetting.Length()) michael@0: break; michael@0: } michael@0: } michael@0: if (i >= limit) { michael@0: return INVALID_STREAM; michael@0: } michael@0: return i; michael@0: } michael@0: michael@0: bool michael@0: DataChannelConnection::RequestMoreStreams(int32_t aNeeded) michael@0: { michael@0: struct sctp_status status; michael@0: struct sctp_add_streams sas; michael@0: uint32_t outStreamsNeeded; michael@0: socklen_t len; michael@0: michael@0: if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) { michael@0: aNeeded = MAX_NUM_STREAMS - mStreams.Length(); michael@0: } michael@0: if (aNeeded <= 0) { michael@0: return false; michael@0: } michael@0: michael@0: len = (socklen_t)sizeof(struct sctp_status); michael@0: if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) { michael@0: LOG(("***failed: getsockopt SCTP_STATUS")); michael@0: return false; michael@0: } michael@0: outStreamsNeeded = aNeeded; // number to add michael@0: michael@0: // Note: if multiple channel opens happen when we don't have enough space, michael@0: // we'll call RequestMoreStreams() multiple times michael@0: memset(&sas, 0, sizeof(sas)); michael@0: sas.sas_instrms = 0; michael@0: sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ michael@0: // Doesn't block, we get an event when it succeeds or fails michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, michael@0: (socklen_t) sizeof(struct sctp_add_streams)) < 0) { michael@0: if (errno == EALREADY) { michael@0: LOG(("Already have %u output streams", outStreamsNeeded)); michael@0: return true; michael@0: } michael@0: michael@0: LOG(("***failed: setsockopt ADD errno=%d", errno)); michael@0: return false; michael@0: } michael@0: LOG(("Requested %u more streams", outStreamsNeeded)); michael@0: // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the michael@0: // values are larger than mStreams.Length() michael@0: return true; michael@0: } michael@0: michael@0: int32_t michael@0: DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream) michael@0: { michael@0: struct sctp_sndinfo sndinfo; michael@0: michael@0: // Note: Main-thread IO, but doesn't block michael@0: memset(&sndinfo, 0, sizeof(struct sctp_sndinfo)); michael@0: sndinfo.snd_sid = stream; michael@0: sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); michael@0: if (usrsctp_sendv(mSocket, msg, len, nullptr, 0, michael@0: &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), michael@0: SCTP_SENDV_SNDINFO, 0) < 0) { michael@0: //LOG(("***failed: sctp_sendv")); don't log because errno is a return! michael@0: return (0); michael@0: } michael@0: return (1); michael@0: } michael@0: michael@0: int32_t michael@0: DataChannelConnection::SendOpenAckMessage(uint16_t stream) michael@0: { michael@0: struct rtcweb_datachannel_ack ack; michael@0: michael@0: memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack)); michael@0: ack.msg_type = DATA_CHANNEL_ACK; michael@0: michael@0: return SendControlMessage(&ack, sizeof(ack), stream); michael@0: } michael@0: michael@0: int32_t michael@0: DataChannelConnection::SendOpenRequestMessage(const nsACString& label, michael@0: const nsACString& protocol, michael@0: uint16_t stream, bool unordered, michael@0: uint16_t prPolicy, uint32_t prValue) michael@0: { michael@0: int label_len = label.Length(); // not including nul michael@0: int proto_len = protocol.Length(); // not including nul michael@0: struct rtcweb_datachannel_open_request *req = michael@0: (struct rtcweb_datachannel_open_request*) moz_xmalloc((sizeof(*req)-1) + label_len + proto_len); michael@0: // careful - request includes 1 char label michael@0: michael@0: memset(req, 0, sizeof(struct rtcweb_datachannel_open_request)); michael@0: req->msg_type = DATA_CHANNEL_OPEN_REQUEST; michael@0: switch (prPolicy) { michael@0: case SCTP_PR_SCTP_NONE: michael@0: req->channel_type = DATA_CHANNEL_RELIABLE; michael@0: break; michael@0: case SCTP_PR_SCTP_TTL: michael@0: req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; michael@0: break; michael@0: case SCTP_PR_SCTP_RTX: michael@0: req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; michael@0: break; michael@0: default: michael@0: // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno! michael@0: moz_free(req); michael@0: return (0); michael@0: } michael@0: if (unordered) { michael@0: // Per the current types, all differ by 0x80 between ordered and unordered michael@0: req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future michael@0: } michael@0: michael@0: req->reliability_param = htonl(prValue); michael@0: req->priority = htons(0); /* XXX: add support */ michael@0: req->label_length = htons(label_len); michael@0: req->protocol_length = htons(proto_len); michael@0: memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len); michael@0: memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len); michael@0: michael@0: // sizeof(*req) already includes +1 byte for label, need nul for both strings michael@0: int32_t result = SendControlMessage(req, (sizeof(*req)-1) + label_len + proto_len, stream); michael@0: michael@0: moz_free(req); michael@0: return result; michael@0: } michael@0: michael@0: // XXX This should use a separate thread (outbound queue) which should michael@0: // select() to know when to *try* to send data to the socket again. michael@0: // Alternatively, it can use a timeout, but that's guaranteed to be wrong michael@0: // (just not sure in what direction). We could re-implement NSPR's michael@0: // PR_POLL_WRITE/etc handling... with a lot of work. michael@0: michael@0: // Better yet, use the SCTP stack's notifications on buffer state to avoid michael@0: // filling the SCTP's buffers. michael@0: michael@0: // returns if we're still blocked or not michael@0: bool michael@0: DataChannelConnection::SendDeferredMessages() michael@0: { michael@0: uint32_t i; michael@0: nsRefPtr channel; // we may null out the refs to this michael@0: bool still_blocked = false; michael@0: bool sent = false; michael@0: michael@0: // This may block while something is modifying channels, but should not block for IO michael@0: MutexAutoLock lock(mLock); michael@0: michael@0: // XXX For total fairness, on a still_blocked we'd start next time at the michael@0: // same index. Sorry, not going to bother for now. michael@0: for (i = 0; i < mStreams.Length(); ++i) { michael@0: channel = mStreams[i]; michael@0: if (!channel) michael@0: continue; michael@0: michael@0: // Only one of these should be set.... michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) { michael@0: if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol, michael@0: channel->mStream, michael@0: channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED, michael@0: channel->mPrPolicy, channel->mPrValue)) { michael@0: channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ; michael@0: michael@0: channel->mState = OPEN; michael@0: channel->mReady = true; michael@0: LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, michael@0: channel)); michael@0: sent = true; michael@0: } else { michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: still_blocked = true; michael@0: } else { michael@0: // Close the channel, inform the user michael@0: mStreams[channel->mStream] = nullptr; michael@0: channel->mState = CLOSED; michael@0: // Don't need to reset; we didn't open it michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, michael@0: channel)); michael@0: } michael@0: } michael@0: } michael@0: if (still_blocked) michael@0: break; michael@0: michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) { michael@0: if (SendOpenAckMessage(channel->mStream)) { michael@0: channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK; michael@0: sent = true; michael@0: } else { michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: still_blocked = true; michael@0: } else { michael@0: // Close the channel, inform the user michael@0: CloseInt(channel); michael@0: // XXX send error via DataChannelOnMessageAvailable (bug 843625) michael@0: } michael@0: } michael@0: } michael@0: if (still_blocked) michael@0: break; michael@0: michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) { michael@0: bool failed_send = false; michael@0: int32_t result; michael@0: michael@0: if (channel->mState == CLOSED || channel->mState == CLOSING) { michael@0: channel->mBufferedData.Clear(); michael@0: } michael@0: while (!channel->mBufferedData.IsEmpty() && michael@0: !failed_send) { michael@0: struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa; michael@0: const char *data = channel->mBufferedData[0]->mData; michael@0: uint32_t len = channel->mBufferedData[0]->mLength; michael@0: michael@0: // SCTP will return EMSGSIZE if the message is bigger than the buffer michael@0: // size (or EAGAIN if there isn't space) michael@0: if ((result = usrsctp_sendv(mSocket, data, len, michael@0: nullptr, 0, michael@0: (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa), michael@0: SCTP_SENDV_SPA, michael@0: 0) < 0)) { michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: // leave queued for resend michael@0: failed_send = true; michael@0: LOG(("queue full again when resending %d bytes (%d)", len, result)); michael@0: } else { michael@0: LOG(("error %d re-sending string", errno)); michael@0: failed_send = true; michael@0: } michael@0: } else { michael@0: LOG(("Resent buffer of %d bytes (%d)", len, result)); michael@0: sent = true; michael@0: channel->mBufferedData.RemoveElementAt(0); michael@0: } michael@0: } michael@0: if (channel->mBufferedData.IsEmpty()) michael@0: channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA; michael@0: else michael@0: still_blocked = true; michael@0: } michael@0: if (still_blocked) michael@0: break; michael@0: } michael@0: michael@0: if (!still_blocked) { michael@0: // mDeferTimeout becomes an estimate of how long we need to wait next time we block michael@0: return false; michael@0: } michael@0: // adjust time? More time for next wait if we didn't send anything, less if did michael@0: // Pretty crude, but better than nothing; just to keep CPU use down michael@0: if (!sent && mDeferTimeout < 50) michael@0: mDeferTimeout++; michael@0: else if (sent && mDeferTimeout > 10) michael@0: mDeferTimeout--; michael@0: michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, michael@0: size_t length, michael@0: uint16_t stream) michael@0: { michael@0: nsRefPtr channel; michael@0: uint32_t prValue; michael@0: uint16_t prPolicy; michael@0: uint32_t flags; michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: michael@0: if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) { michael@0: LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length, michael@0: (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))); michael@0: if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) michael@0: return; michael@0: } michael@0: michael@0: LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req))); michael@0: michael@0: switch (req->channel_type) { michael@0: case DATA_CHANNEL_RELIABLE: michael@0: case DATA_CHANNEL_RELIABLE_UNORDERED: michael@0: prPolicy = SCTP_PR_SCTP_NONE; michael@0: break; michael@0: case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: michael@0: case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED: michael@0: prPolicy = SCTP_PR_SCTP_RTX; michael@0: break; michael@0: case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: michael@0: case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED: michael@0: prPolicy = SCTP_PR_SCTP_TTL; michael@0: break; michael@0: default: michael@0: LOG(("Unknown channel type", req->channel_type)); michael@0: /* XXX error handling */ michael@0: return; michael@0: } michael@0: prValue = ntohl(req->reliability_param); michael@0: flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; michael@0: michael@0: if ((channel = FindChannelByStream(stream))) { michael@0: if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { michael@0: LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.", michael@0: stream, channel->mState)); michael@0: /* XXX: some error handling */ michael@0: } else { michael@0: LOG(("Open for externally negotiated channel %u", stream)); michael@0: // XXX should also check protocol, maybe label michael@0: if (prPolicy != channel->mPrPolicy || michael@0: prValue != channel->mPrValue || michael@0: flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) michael@0: { michael@0: LOG(("WARNING: external negotiation mismatch with OpenRequest:" michael@0: "channel %u, policy %u/%u, value %u/%u, flags %x/%x", michael@0: stream, prPolicy, channel->mPrPolicy, michael@0: prValue, channel->mPrValue, flags, channel->mFlags)); michael@0: } michael@0: } michael@0: return; michael@0: } michael@0: if (stream >= mStreams.Length()) { michael@0: LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length())); michael@0: return; michael@0: } michael@0: michael@0: nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length))); michael@0: nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)], michael@0: ntohs(req->protocol_length))); michael@0: michael@0: channel = new DataChannel(this, michael@0: stream, michael@0: DataChannel::CONNECTING, michael@0: label, michael@0: protocol, michael@0: prPolicy, prValue, michael@0: flags, michael@0: nullptr, nullptr); michael@0: mStreams[stream] = channel; michael@0: michael@0: channel->mState = DataChannel::WAITING_TO_OPEN; michael@0: michael@0: LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__, michael@0: channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState)); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, michael@0: this, channel)); michael@0: michael@0: LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); michael@0: michael@0: if (!SendOpenAckMessage(stream)) { michael@0: // XXX Only on EAGAIN!? And if not, then close the channel?? michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK; michael@0: StartDefer(); michael@0: } michael@0: michael@0: // Now process any queued data messages for the channel (which will michael@0: // themselves likely get queued until we leave WAITING_TO_OPEN, plus any michael@0: // more that come in before that happens) michael@0: DeliverQueuedData(stream); michael@0: } michael@0: michael@0: // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. michael@0: // That would make this code moot. Keep it for now for backwards compatibility. michael@0: void michael@0: DataChannelConnection::DeliverQueuedData(uint16_t stream) michael@0: { michael@0: mLock.AssertCurrentThreadOwns(); michael@0: michael@0: uint32_t i = 0; michael@0: while (i < mQueuedData.Length()) { michael@0: // Careful! we may modify the array length from within the loop! michael@0: if (mQueuedData[i]->mStream == stream) { michael@0: LOG(("Delivering queued data for stream %u, length %u", michael@0: stream, mQueuedData[i]->mLength)); michael@0: // Deliver the queued data michael@0: HandleDataMessage(mQueuedData[i]->mPpid, michael@0: mQueuedData[i]->mData, mQueuedData[i]->mLength, michael@0: mQueuedData[i]->mStream); michael@0: mQueuedData.RemoveElementAt(i); michael@0: continue; // don't bump index since we removed the element michael@0: } michael@0: i++; michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, michael@0: size_t length, uint16_t stream) michael@0: { michael@0: DataChannel *channel; michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: michael@0: channel = FindChannelByStream(stream); michael@0: NS_ENSURE_TRUE_VOID(channel); michael@0: michael@0: LOG(("OpenAck received for stream %u, waiting=%d", stream, michael@0: (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0)); michael@0: michael@0: channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK; michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream) michael@0: { michael@0: /* XXX: Send an error message? */ michael@0: LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream)); michael@0: // XXX Log to JS error console if possible michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleDataMessage(uint32_t ppid, michael@0: const void *data, size_t length, michael@0: uint16_t stream) michael@0: { michael@0: DataChannel *channel; michael@0: const char *buffer = (const char *) data; michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: michael@0: channel = FindChannelByStream(stream); michael@0: michael@0: // XXX A closed channel may trip this... check michael@0: // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK. michael@0: // That would make this code moot. Keep it for now for backwards compatibility. michael@0: if (!channel) { michael@0: // In the updated 0-RTT open case, the sender can send data immediately michael@0: // after Open, and doesn't set the in-order bit (since we don't have a michael@0: // response or ack). Also, with external negotiation, data can come in michael@0: // before we're told about the external negotiation. We need to buffer michael@0: // data until either a) Open comes in, if the ordering get messed up, michael@0: // or b) the app tells us this channel was externally negotiated. When michael@0: // these occur, we deliver the data. michael@0: michael@0: // Since this is rare and non-performance, keep a single list of queued michael@0: // data messages to deliver once the channel opens. michael@0: LOG(("Queuing data for stream %u, length %u", stream, length)); michael@0: // Copies data michael@0: mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length)); michael@0: return; michael@0: } michael@0: michael@0: // XXX should this be a simple if, no warnings/debugbreaks? michael@0: NS_ENSURE_TRUE_VOID(channel->mState != CLOSED); michael@0: michael@0: { michael@0: nsAutoCString recvData(buffer, length); // copies (<64) or allocates michael@0: bool is_binary = true; michael@0: michael@0: if (ppid == DATA_CHANNEL_PPID_DOMSTRING || michael@0: ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) { michael@0: is_binary = false; michael@0: } michael@0: if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) { michael@0: NS_WARNING("DataChannel message aborted by fragment type change!"); michael@0: channel->mRecvBuffer.Truncate(0); michael@0: } michael@0: channel->mIsRecvBinary = is_binary; michael@0: michael@0: switch (ppid) { michael@0: case DATA_CHANNEL_PPID_DOMSTRING: michael@0: case DATA_CHANNEL_PPID_BINARY: michael@0: channel->mRecvBuffer += recvData; michael@0: LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u", michael@0: is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(), michael@0: channel->mStream)); michael@0: return; // Not ready to notify application michael@0: michael@0: case DATA_CHANNEL_PPID_DOMSTRING_LAST: michael@0: LOG(("DataChannel: String message received of length %lu on channel %u", michael@0: length, channel->mStream)); michael@0: if (!channel->mRecvBuffer.IsEmpty()) { michael@0: channel->mRecvBuffer += recvData; michael@0: LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel)); michael@0: channel->SendOrQueue(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_DATA, this, michael@0: channel, channel->mRecvBuffer, -1)); michael@0: channel->mRecvBuffer.Truncate(0); michael@0: return; michael@0: } michael@0: // else send using recvData normally michael@0: length = -1; // Flag for DOMString michael@0: michael@0: // WebSockets checks IsUTF8() here; we can try to deliver it michael@0: break; michael@0: michael@0: case DATA_CHANNEL_PPID_BINARY_LAST: michael@0: LOG(("DataChannel: Received binary message of length %lu on channel id %u", michael@0: length, channel->mStream)); michael@0: if (!channel->mRecvBuffer.IsEmpty()) { michael@0: channel->mRecvBuffer += recvData; michael@0: LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); michael@0: channel->SendOrQueue(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_DATA, this, michael@0: channel, channel->mRecvBuffer, michael@0: channel->mRecvBuffer.Length())); michael@0: channel->mRecvBuffer.Truncate(0); michael@0: return; michael@0: } michael@0: // else send using recvData normally michael@0: break; michael@0: michael@0: default: michael@0: NS_ERROR("Unknown data PPID"); michael@0: return; michael@0: } michael@0: /* Notify onmessage */ michael@0: LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); michael@0: channel->SendOrQueue(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_DATA, this, michael@0: channel, recvData, length)); michael@0: } michael@0: } michael@0: michael@0: // Called with mLock locked! michael@0: void michael@0: DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream) michael@0: { michael@0: const struct rtcweb_datachannel_open_request *req; michael@0: const struct rtcweb_datachannel_ack *ack; michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: michael@0: switch (ppid) { michael@0: case DATA_CHANNEL_PPID_CONTROL: michael@0: req = static_cast(buffer); michael@0: michael@0: NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message michael@0: switch (req->msg_type) { michael@0: case DATA_CHANNEL_OPEN_REQUEST: michael@0: // structure includes a possibly-unused char label[1] (in a packed structure) michael@0: NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1); michael@0: michael@0: HandleOpenRequestMessage(req, length, stream); michael@0: break; michael@0: case DATA_CHANNEL_ACK: michael@0: // >= sizeof(*ack) checked above michael@0: michael@0: ack = static_cast(buffer); michael@0: HandleOpenAckMessage(ack, length, stream); michael@0: break; michael@0: default: michael@0: HandleUnknownMessage(ppid, length, stream); michael@0: break; michael@0: } michael@0: break; michael@0: case DATA_CHANNEL_PPID_DOMSTRING: michael@0: case DATA_CHANNEL_PPID_DOMSTRING_LAST: michael@0: case DATA_CHANNEL_PPID_BINARY: michael@0: case DATA_CHANNEL_PPID_BINARY_LAST: michael@0: HandleDataMessage(ppid, buffer, length, stream); michael@0: break; michael@0: default: michael@0: LOG(("Message of length %lu, PPID %u on stream %u received.", michael@0: length, ppid, stream)); michael@0: break; michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac) michael@0: { michael@0: uint32_t i, n; michael@0: michael@0: switch (sac->sac_state) { michael@0: case SCTP_COMM_UP: michael@0: LOG(("Association change: SCTP_COMM_UP")); michael@0: if (mState == CONNECTING) { michael@0: mSocket = mMasterSocket; michael@0: mState = OPEN; michael@0: michael@0: SetEvenOdd(); michael@0: michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CONNECTION, michael@0: this, true)); michael@0: LOG(("DTLS connect() succeeded! Entering connected mode")); michael@0: michael@0: // Open any streams pending... michael@0: ProcessQueuedOpens(); michael@0: michael@0: } else if (mState == OPEN) { michael@0: LOG(("DataConnection Already OPEN")); michael@0: } else { michael@0: LOG(("Unexpected state: %d", mState)); michael@0: } michael@0: break; michael@0: case SCTP_COMM_LOST: michael@0: LOG(("Association change: SCTP_COMM_LOST")); michael@0: // This association is toast, so also close all the channels -- from mainthread! michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_DISCONNECTED, michael@0: this)); michael@0: break; michael@0: case SCTP_RESTART: michael@0: LOG(("Association change: SCTP_RESTART")); michael@0: break; michael@0: case SCTP_SHUTDOWN_COMP: michael@0: LOG(("Association change: SCTP_SHUTDOWN_COMP")); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_DISCONNECTED, michael@0: this)); michael@0: break; michael@0: case SCTP_CANT_STR_ASSOC: michael@0: LOG(("Association change: SCTP_CANT_STR_ASSOC")); michael@0: break; michael@0: default: michael@0: LOG(("Association change: UNKNOWN")); michael@0: break; michael@0: } michael@0: LOG(("Association change: streams (in/out) = (%u/%u)", michael@0: sac->sac_inbound_streams, sac->sac_outbound_streams)); michael@0: michael@0: NS_ENSURE_TRUE_VOID(sac); michael@0: n = sac->sac_length - sizeof(*sac); michael@0: if (((sac->sac_state == SCTP_COMM_UP) || michael@0: (sac->sac_state == SCTP_RESTART)) && (n > 0)) { michael@0: for (i = 0; i < n; ++i) { michael@0: switch (sac->sac_info[i]) { michael@0: case SCTP_ASSOC_SUPPORTS_PR: michael@0: LOG(("Supports: PR")); michael@0: break; michael@0: case SCTP_ASSOC_SUPPORTS_AUTH: michael@0: LOG(("Supports: AUTH")); michael@0: break; michael@0: case SCTP_ASSOC_SUPPORTS_ASCONF: michael@0: LOG(("Supports: ASCONF")); michael@0: break; michael@0: case SCTP_ASSOC_SUPPORTS_MULTIBUF: michael@0: LOG(("Supports: MULTIBUF")); michael@0: break; michael@0: case SCTP_ASSOC_SUPPORTS_RE_CONFIG: michael@0: LOG(("Supports: RE-CONFIG")); michael@0: break; michael@0: default: michael@0: LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); michael@0: break; michael@0: } michael@0: } michael@0: } else if (((sac->sac_state == SCTP_COMM_LOST) || michael@0: (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { michael@0: LOG(("Association: ABORT =")); michael@0: for (i = 0; i < n; ++i) { michael@0: LOG((" 0x%02x", sac->sac_info[i])); michael@0: } michael@0: } michael@0: if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || michael@0: (sac->sac_state == SCTP_SHUTDOWN_COMP) || michael@0: (sac->sac_state == SCTP_COMM_LOST)) { michael@0: return; michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc) michael@0: { michael@0: char addr_buf[INET6_ADDRSTRLEN]; michael@0: const char *addr = ""; michael@0: struct sockaddr_in *sin; michael@0: struct sockaddr_in6 *sin6; michael@0: #if defined(__Userspace_os_Windows) michael@0: DWORD addr_len = INET6_ADDRSTRLEN; michael@0: #endif michael@0: michael@0: switch (spc->spc_aaddr.ss_family) { michael@0: case AF_INET: michael@0: sin = (struct sockaddr_in *)&spc->spc_aaddr; michael@0: #if !defined(__Userspace_os_Windows) michael@0: addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); michael@0: #else michael@0: if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr, michael@0: addr_buf, &addr_len)) { michael@0: return; michael@0: } michael@0: #endif michael@0: break; michael@0: case AF_INET6: michael@0: sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr; michael@0: #if !defined(__Userspace_os_Windows) michael@0: addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); michael@0: #else michael@0: if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr, michael@0: addr_buf, &addr_len)) { michael@0: return; michael@0: } michael@0: #endif michael@0: case AF_CONN: michael@0: addr = "DTLS connection"; michael@0: break; michael@0: default: michael@0: break; michael@0: } michael@0: LOG(("Peer address %s is now ", addr)); michael@0: switch (spc->spc_state) { michael@0: case SCTP_ADDR_AVAILABLE: michael@0: LOG(("SCTP_ADDR_AVAILABLE")); michael@0: break; michael@0: case SCTP_ADDR_UNREACHABLE: michael@0: LOG(("SCTP_ADDR_UNREACHABLE")); michael@0: break; michael@0: case SCTP_ADDR_REMOVED: michael@0: LOG(("SCTP_ADDR_REMOVED")); michael@0: break; michael@0: case SCTP_ADDR_ADDED: michael@0: LOG(("SCTP_ADDR_ADDED")); michael@0: break; michael@0: case SCTP_ADDR_MADE_PRIM: michael@0: LOG(("SCTP_ADDR_MADE_PRIM")); michael@0: break; michael@0: case SCTP_ADDR_CONFIRMED: michael@0: LOG(("SCTP_ADDR_CONFIRMED")); michael@0: break; michael@0: default: michael@0: LOG(("UNKNOWN")); michael@0: break; michael@0: } michael@0: LOG((" (error = 0x%08x).\n", spc->spc_error)); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre) michael@0: { michael@0: size_t i, n; michael@0: michael@0: n = sre->sre_length - sizeof(struct sctp_remote_error); michael@0: LOG(("Remote Error (error = 0x%04x): ", sre->sre_error)); michael@0: for (i = 0; i < n; ++i) { michael@0: LOG((" 0x%02x", sre-> sre_data[i])); michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse) michael@0: { michael@0: LOG(("Shutdown event.")); michael@0: /* XXX: notify all channels. */ michael@0: // Attempts to actually send anything will fail michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai) michael@0: { michael@0: LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind)); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe) michael@0: { michael@0: size_t i, n; michael@0: michael@0: if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { michael@0: LOG(("Unsent ")); michael@0: } michael@0: if (ssfe->ssfe_flags & SCTP_DATA_SENT) { michael@0: LOG(("Sent ")); michael@0: } michael@0: if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { michael@0: LOG(("(flags = %x) ", ssfe->ssfe_flags)); michael@0: } michael@0: LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x", michael@0: ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, michael@0: ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); michael@0: n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); michael@0: for (i = 0; i < n; ++i) { michael@0: LOG((" 0x%02x", ssfe->ssfe_data[i])); michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::ClearResets() michael@0: { michael@0: // Clear all pending resets michael@0: if (!mStreamsResetting.IsEmpty()) { michael@0: LOG(("Clearing resets for %d streams", mStreamsResetting.Length())); michael@0: } michael@0: michael@0: for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) { michael@0: nsRefPtr channel; michael@0: channel = FindChannelByStream(mStreamsResetting[i]); michael@0: if (channel) { michael@0: LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get())); michael@0: mStreams[channel->mStream] = nullptr; michael@0: } michael@0: } michael@0: mStreamsResetting.Clear(); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::ResetOutgoingStream(uint16_t stream) michael@0: { michael@0: uint32_t i; michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: LOG(("Connection %p: Resetting outgoing stream %u", michael@0: (void *) this, stream)); michael@0: // Rarely has more than a couple items and only for a short time michael@0: for (i = 0; i < mStreamsResetting.Length(); ++i) { michael@0: if (mStreamsResetting[i] == stream) { michael@0: return; michael@0: } michael@0: } michael@0: mStreamsResetting.AppendElement(stream); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::SendOutgoingStreamReset() michael@0: { michael@0: struct sctp_reset_streams *srs; michael@0: uint32_t i; michael@0: size_t len; michael@0: michael@0: LOG(("Connection %p: Sending outgoing stream reset for %d streams", michael@0: (void *) this, mStreamsResetting.Length())); michael@0: mLock.AssertCurrentThreadOwns(); michael@0: if (mStreamsResetting.IsEmpty()) { michael@0: LOG(("No streams to reset")); michael@0: return; michael@0: } michael@0: len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); michael@0: srs = static_cast (moz_xmalloc(len)); // infallible malloc michael@0: memset(srs, 0, len); michael@0: srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; michael@0: srs->srs_number_streams = mStreamsResetting.Length(); michael@0: for (i = 0; i < mStreamsResetting.Length(); ++i) { michael@0: srs->srs_stream_list[i] = mStreamsResetting[i]; michael@0: } michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) { michael@0: LOG(("***failed: setsockopt RESET, errno %d", errno)); michael@0: // if errno == EALREADY, this is normal - we can't send another reset michael@0: // with one pending. michael@0: // When we get an incoming reset (which may be a response to our michael@0: // outstanding one), see if we have any pending outgoing resets and michael@0: // send them michael@0: } else { michael@0: mStreamsResetting.Clear(); michael@0: } michael@0: moz_free(srs); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) michael@0: { michael@0: uint32_t n, i; michael@0: nsRefPtr channel; // since we may null out the ref to the channel michael@0: michael@0: if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && michael@0: !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { michael@0: n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); michael@0: for (i = 0; i < n; ++i) { michael@0: if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { michael@0: channel = FindChannelByStream(strrst->strreset_stream_list[i]); michael@0: if (channel) { michael@0: // The other side closed the channel michael@0: // We could be in three states: michael@0: // 1. Normal state (input and output streams (OPEN) michael@0: // Notify application, send a RESET in response on our michael@0: // outbound channel. Go to CLOSED michael@0: // 2. We sent our own reset (CLOSING); either they crossed on the michael@0: // wire, or this is a response to our Reset. michael@0: // Go to CLOSED michael@0: // 3. We've sent a open but haven't gotten a response yet (CONNECTING) michael@0: // I believe this is impossible, as we don't have an input stream yet. michael@0: michael@0: LOG(("Incoming: Channel %u closed, state %d", michael@0: channel->mStream, channel->mState)); michael@0: ASSERT_WEBRTC(channel->mState == DataChannel::OPEN || michael@0: channel->mState == DataChannel::CLOSING || michael@0: channel->mState == DataChannel::CONNECTING || michael@0: channel->mState == DataChannel::WAITING_TO_OPEN); michael@0: if (channel->mState == DataChannel::OPEN || michael@0: channel->mState == DataChannel::WAITING_TO_OPEN) { michael@0: ResetOutgoingStream(channel->mStream); michael@0: SendOutgoingStreamReset(); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, michael@0: channel)); michael@0: } michael@0: mStreams[channel->mStream] = nullptr; michael@0: michael@0: LOG(("Disconnected DataChannel %p from connection %p", michael@0: (void *) channel.get(), (void *) channel->mConnection.get())); michael@0: channel->Destroy(); michael@0: // At this point when we leave here, the object is a zombie held alive only by the DOM object michael@0: } else { michael@0: LOG(("Can't find incoming channel %d",i)); michael@0: } michael@0: } michael@0: } michael@0: } michael@0: michael@0: // In case we failed to send a RESET due to having one outstanding, process any pending resets now: michael@0: if (!mStreamsResetting.IsEmpty()) { michael@0: LOG(("Sending %d pending resets", mStreamsResetting.Length())); michael@0: SendOutgoingStreamReset(); michael@0: } michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg) michael@0: { michael@0: uint16_t stream; michael@0: uint32_t i; michael@0: nsRefPtr channel; michael@0: michael@0: if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { michael@0: LOG(("*** Failed increasing number of streams from %u (%u/%u)", michael@0: mStreams.Length(), michael@0: strchg->strchange_instrms, michael@0: strchg->strchange_outstrms)); michael@0: // XXX FIX! notify pending opens of failure michael@0: return; michael@0: } else { michael@0: if (strchg->strchange_instrms > mStreams.Length()) { michael@0: LOG(("Other side increased streams from %u to %u", michael@0: mStreams.Length(), strchg->strchange_instrms)); michael@0: } michael@0: if (strchg->strchange_outstrms > mStreams.Length() || michael@0: strchg->strchange_instrms > mStreams.Length()) { michael@0: uint16_t old_len = mStreams.Length(); michael@0: uint16_t new_len = std::max(strchg->strchange_outstrms, michael@0: strchg->strchange_instrms); michael@0: LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)", michael@0: old_len, new_len, new_len - old_len, michael@0: strchg->strchange_instrms)); michael@0: // make sure both are the same length michael@0: mStreams.AppendElements(new_len - old_len); michael@0: LOG(("New length = %d (was %d)", mStreams.Length(), old_len)); michael@0: for (size_t i = old_len; i < mStreams.Length(); ++i) { michael@0: mStreams[i] = nullptr; michael@0: } michael@0: // Re-process any channels waiting for streams. michael@0: // Linear search, but we don't increase channels often and michael@0: // the array would only get long in case of an app error normally michael@0: michael@0: // Make sure we request enough streams if there's a big jump in streams michael@0: // Could make a more complex API for OpenXxxFinish() and avoid this loop michael@0: int32_t num_needed = mPending.GetSize(); michael@0: LOG(("%d of %d new streams already needed", num_needed, michael@0: new_len - old_len)); michael@0: num_needed -= (new_len - old_len); // number we added michael@0: if (num_needed > 0) { michael@0: if (num_needed < 16) michael@0: num_needed = 16; michael@0: LOG(("Not enough new streams, asking for %d more", num_needed)); michael@0: RequestMoreStreams(num_needed); michael@0: } else if (strchg->strchange_outstrms < strchg->strchange_instrms) { michael@0: LOG(("Requesting %d output streams to match partner", michael@0: strchg->strchange_instrms - strchg->strchange_outstrms)); michael@0: RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms); michael@0: } michael@0: michael@0: ProcessQueuedOpens(); michael@0: } michael@0: // else probably not a change in # of streams michael@0: } michael@0: michael@0: for (i = 0; i < mStreams.Length(); ++i) { michael@0: channel = mStreams[i]; michael@0: if (!channel) michael@0: continue; michael@0: michael@0: if ((channel->mState == CONNECTING) && michael@0: (channel->mStream == INVALID_STREAM)) { michael@0: if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || michael@0: (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { michael@0: /* XXX: Signal to the other end. */ michael@0: channel->mState = CLOSED; michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, michael@0: channel)); michael@0: // maybe fire onError (bug 843625) michael@0: } else { michael@0: stream = FindFreeStream(); michael@0: if (stream != INVALID_STREAM) { michael@0: channel->mStream = stream; michael@0: mStreams[stream] = channel; michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; michael@0: /// XXX fix michael@0: StartDefer(); michael@0: } else { michael@0: /* We will not find more ... */ michael@0: break; michael@0: } michael@0: } michael@0: } michael@0: } michael@0: } michael@0: michael@0: michael@0: // Called with mLock locked! michael@0: void michael@0: DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n) michael@0: { michael@0: mLock.AssertCurrentThreadOwns(); michael@0: if (notif->sn_header.sn_length != (uint32_t)n) { michael@0: return; michael@0: } michael@0: switch (notif->sn_header.sn_type) { michael@0: case SCTP_ASSOC_CHANGE: michael@0: HandleAssociationChangeEvent(&(notif->sn_assoc_change)); michael@0: break; michael@0: case SCTP_PEER_ADDR_CHANGE: michael@0: HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); michael@0: break; michael@0: case SCTP_REMOTE_ERROR: michael@0: HandleRemoteErrorEvent(&(notif->sn_remote_error)); michael@0: break; michael@0: case SCTP_SHUTDOWN_EVENT: michael@0: HandleShutdownEvent(&(notif->sn_shutdown_event)); michael@0: break; michael@0: case SCTP_ADAPTATION_INDICATION: michael@0: HandleAdaptationIndication(&(notif->sn_adaptation_event)); michael@0: break; michael@0: case SCTP_PARTIAL_DELIVERY_EVENT: michael@0: LOG(("SCTP_PARTIAL_DELIVERY_EVENT")); michael@0: break; michael@0: case SCTP_AUTHENTICATION_EVENT: michael@0: LOG(("SCTP_AUTHENTICATION_EVENT")); michael@0: break; michael@0: case SCTP_SENDER_DRY_EVENT: michael@0: //LOG(("SCTP_SENDER_DRY_EVENT")); michael@0: break; michael@0: case SCTP_NOTIFICATIONS_STOPPED_EVENT: michael@0: LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); michael@0: break; michael@0: case SCTP_SEND_FAILED_EVENT: michael@0: HandleSendFailedEvent(&(notif->sn_send_failed_event)); michael@0: break; michael@0: case SCTP_STREAM_RESET_EVENT: michael@0: HandleStreamResetEvent(&(notif->sn_strreset_event)); michael@0: break; michael@0: case SCTP_ASSOC_RESET_EVENT: michael@0: LOG(("SCTP_ASSOC_RESET_EVENT")); michael@0: break; michael@0: case SCTP_STREAM_CHANGE_EVENT: michael@0: HandleStreamChangeEvent(&(notif->sn_strchange_event)); michael@0: break; michael@0: default: michael@0: LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); michael@0: break; michael@0: } michael@0: } michael@0: michael@0: int michael@0: DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen, michael@0: struct sctp_rcvinfo rcv, int32_t flags) michael@0: { michael@0: ASSERT_WEBRTC(!NS_IsMainThread()); michael@0: michael@0: if (!data) { michael@0: usrsctp_close(sock); // SCTP has finished shutting down michael@0: } else { michael@0: MutexAutoLock lock(mLock); michael@0: if (flags & MSG_NOTIFICATION) { michael@0: HandleNotification(static_cast(data), datalen); michael@0: } else { michael@0: HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid); michael@0: } michael@0: } michael@0: // sctp allocates 'data' with malloc(), and expects the receiver to free michael@0: // it (presumably with free). michael@0: // XXX future optimization: try to deliver messages without an internal michael@0: // alloc/copy, and if so delay the free until later. michael@0: free(data); michael@0: // usrsctp defines the callback as returning an int, but doesn't use it michael@0: return 1; michael@0: } michael@0: michael@0: already_AddRefed michael@0: DataChannelConnection::Open(const nsACString& label, const nsACString& protocol, michael@0: Type type, bool inOrder, michael@0: uint32_t prValue, DataChannelListener *aListener, michael@0: nsISupports *aContext, bool aExternalNegotiated, michael@0: uint16_t aStream) michael@0: { michael@0: // aStream == INVALID_STREAM to have the protocol allocate michael@0: uint16_t prPolicy = SCTP_PR_SCTP_NONE; michael@0: uint32_t flags; michael@0: michael@0: LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u", michael@0: PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(), michael@0: type, inOrder, prValue, aListener, aContext, michael@0: aExternalNegotiated ? "true" : "false", aStream)); michael@0: switch (type) { michael@0: case DATA_CHANNEL_RELIABLE: michael@0: prPolicy = SCTP_PR_SCTP_NONE; michael@0: break; michael@0: case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: michael@0: prPolicy = SCTP_PR_SCTP_RTX; michael@0: break; michael@0: case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: michael@0: prPolicy = SCTP_PR_SCTP_TTL; michael@0: break; michael@0: } michael@0: if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) { michael@0: return nullptr; michael@0: } michael@0: michael@0: // Don't look past currently-negotiated streams michael@0: if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) { michael@0: LOG(("ERROR: external negotiation of already-open channel %u", aStream)); michael@0: // XXX How do we indicate this up to the application? Probably the michael@0: // caller's job, but we may need to return an error code. michael@0: return nullptr; michael@0: } michael@0: michael@0: flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0; michael@0: nsRefPtr channel(new DataChannel(this, michael@0: aStream, michael@0: DataChannel::CONNECTING, michael@0: label, protocol, michael@0: type, prValue, michael@0: flags, michael@0: aListener, aContext)); michael@0: if (aExternalNegotiated) { michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED; michael@0: } michael@0: michael@0: MutexAutoLock lock(mLock); // OpenFinish assumes this michael@0: return OpenFinish(channel.forget()); michael@0: } michael@0: michael@0: // Separate routine so we can also call it to finish up from pending opens michael@0: already_AddRefed michael@0: DataChannelConnection::OpenFinish(already_AddRefed&& aChannel) michael@0: { michael@0: nsRefPtr channel(aChannel); // takes the reference passed in michael@0: // Normally 1 reference if called from ::Open(), or 2 if called from michael@0: // ProcessQueuedOpens() unless the DOMDataChannel was gc'd michael@0: uint16_t stream = channel->mStream; michael@0: bool queue = false; michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: michael@0: // Cases we care about: michael@0: // Pre-negotiated: michael@0: // Not Open: michael@0: // Doesn't fit: michael@0: // -> change initial ask or renegotiate after open michael@0: // -> queue open michael@0: // Open: michael@0: // Doesn't fit: michael@0: // -> RequestMoreStreams && queue michael@0: // Does fit: michael@0: // -> open michael@0: // Not negotiated: michael@0: // Not Open: michael@0: // -> queue open michael@0: // Open: michael@0: // -> Try to get a stream michael@0: // Doesn't fit: michael@0: // -> RequestMoreStreams && queue michael@0: // Does fit: michael@0: // -> open michael@0: // So the Open cases are basically the same michael@0: // Not Open cases are simply queue for non-negotiated, and michael@0: // either change the initial ask or possibly renegotiate after open. michael@0: michael@0: if (mState == OPEN) { michael@0: if (stream == INVALID_STREAM) { michael@0: stream = FindFreeStream(); // may be INVALID_STREAM if we need more michael@0: } michael@0: if (stream == INVALID_STREAM || stream >= mStreams.Length()) { michael@0: // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams michael@0: // to avoid going back immediately for more if the ask to N, N+1, etc michael@0: int32_t more_needed = (stream == INVALID_STREAM) ? 16 : michael@0: (stream-((int32_t)mStreams.Length())) + 16; michael@0: if (!RequestMoreStreams(more_needed)) { michael@0: // Something bad happened... we're done michael@0: goto request_error_cleanup; michael@0: } michael@0: queue = true; michael@0: } michael@0: } else { michael@0: // not OPEN michael@0: if (stream != INVALID_STREAM && stream >= mStreams.Length() && michael@0: mState == CLOSED) { michael@0: // Update number of streams for init message michael@0: struct sctp_initmsg initmsg; michael@0: socklen_t len = sizeof(initmsg); michael@0: int32_t total_needed = stream+16; michael@0: michael@0: memset(&initmsg, 0, sizeof(initmsg)); michael@0: if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { michael@0: LOG(("*** failed getsockopt SCTP_INITMSG")); michael@0: goto request_error_cleanup; michael@0: } michael@0: LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed, michael@0: initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); michael@0: initmsg.sinit_num_ostreams = total_needed; michael@0: initmsg.sinit_max_instreams = MAX_NUM_STREAMS; michael@0: if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, michael@0: (socklen_t)sizeof(initmsg)) < 0) { michael@0: LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); michael@0: goto request_error_cleanup; michael@0: } michael@0: michael@0: int32_t old_len = mStreams.Length(); michael@0: mStreams.AppendElements(total_needed - old_len); michael@0: for (int32_t i = old_len; i < total_needed; ++i) { michael@0: mStreams[i] = nullptr; michael@0: } michael@0: } michael@0: // else if state is CONNECTING, we'll just re-negotiate when OpenFinish michael@0: // is called, if needed michael@0: queue = true; michael@0: } michael@0: if (queue) { michael@0: LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream)); michael@0: // Also serves to mark we told the app michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; michael@0: channel->AddRef(); // we need a ref for the nsDeQue and one to return michael@0: mPending.Push(channel); michael@0: return channel.forget(); michael@0: } michael@0: michael@0: MOZ_ASSERT(stream != INVALID_STREAM); michael@0: // just allocated (& OPEN), or externally negotiated michael@0: mStreams[stream] = channel; // holds a reference michael@0: channel->mStream = stream; michael@0: michael@0: #ifdef TEST_QUEUED_DATA michael@0: // It's painful to write a test for this... michael@0: channel->mState = OPEN; michael@0: channel->mReady = true; michael@0: SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST); michael@0: #endif michael@0: michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) { michael@0: // Don't send unordered until this gets cleared michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK; michael@0: } michael@0: michael@0: if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) { michael@0: if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol, michael@0: stream, michael@0: !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), michael@0: channel->mPrPolicy, channel->mPrValue)) { michael@0: LOG(("SendOpenRequest failed, errno = %d", errno)); michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; michael@0: StartDefer(); michael@0: michael@0: return channel.forget(); michael@0: } else { michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { michael@0: // We already returned the channel to the app. michael@0: NS_ERROR("Failed to send open request"); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, michael@0: channel)); michael@0: } michael@0: // If we haven't returned the channel yet, it will get destroyed when we exit michael@0: // this function. michael@0: mStreams[stream] = nullptr; michael@0: channel->mStream = INVALID_STREAM; michael@0: // we'll be destroying the channel michael@0: channel->mState = CLOSED; michael@0: return nullptr; michael@0: } michael@0: /* NOTREACHED */ michael@0: } michael@0: } michael@0: // Either externally negotiated or we sent Open michael@0: channel->mState = OPEN; michael@0: channel->mReady = true; michael@0: // FIX? Move into DOMDataChannel? I don't think we can send it yet here michael@0: LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get())); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, michael@0: channel)); michael@0: michael@0: return channel.forget(); michael@0: michael@0: request_error_cleanup: michael@0: channel->mState = CLOSED; michael@0: if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { michael@0: // We already returned the channel to the app. michael@0: NS_ERROR("Failed to request more streams"); michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, michael@0: channel)); michael@0: return channel.forget(); michael@0: } michael@0: // we'll be destroying the channel, but it never really got set up michael@0: // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and michael@0: // Dispatch it to ourselves michael@0: return nullptr; michael@0: } michael@0: michael@0: int32_t michael@0: DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data, michael@0: uint32_t length, uint32_t ppid) michael@0: { michael@0: uint16_t flags; michael@0: struct sctp_sendv_spa spa; michael@0: int32_t result; michael@0: michael@0: NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0); michael@0: NS_WARN_IF_FALSE(length > 0, "Length is 0?!"); michael@0: michael@0: // To avoid problems where an in-order OPEN is lost and an michael@0: // out-of-order data message "beats" it, require data to be in-order michael@0: // until we get an ACK. michael@0: if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && michael@0: !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) { michael@0: flags = SCTP_UNORDERED; michael@0: } else { michael@0: flags = 0; michael@0: } michael@0: michael@0: spa.sendv_sndinfo.snd_ppid = htonl(ppid); michael@0: spa.sendv_sndinfo.snd_sid = channel->mStream; michael@0: spa.sendv_sndinfo.snd_flags = flags; michael@0: spa.sendv_sndinfo.snd_context = 0; michael@0: spa.sendv_sndinfo.snd_assoc_id = 0; michael@0: spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; michael@0: michael@0: if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) { michael@0: spa.sendv_prinfo.pr_policy = channel->mPrPolicy; michael@0: spa.sendv_prinfo.pr_value = channel->mPrValue; michael@0: spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; michael@0: } michael@0: michael@0: // Note: Main-thread IO, but doesn't block! michael@0: // XXX FIX! to deal with heavy overruns of JS trying to pass data in michael@0: // (more than the buffersize) queue data onto another thread to do the michael@0: // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp michael@0: michael@0: // SCTP will return EMSGSIZE if the message is bigger than the buffer michael@0: // size (or EAGAIN if there isn't space) michael@0: if (channel->mBufferedData.IsEmpty()) { michael@0: result = usrsctp_sendv(mSocket, data, length, michael@0: nullptr, 0, michael@0: (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa), michael@0: SCTP_SENDV_SPA, 0); michael@0: LOG(("Sent buffer (len=%u), result=%d", length, result)); michael@0: } else { michael@0: // Fake EAGAIN if we're already buffering data michael@0: result = -1; michael@0: errno = EAGAIN; michael@0: } michael@0: if (result < 0) { michael@0: if (errno == EAGAIN || errno == EWOULDBLOCK) { michael@0: // queue data for resend! And queue any further data for the stream until it is... michael@0: BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc michael@0: channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array michael@0: channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA; michael@0: LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length)); michael@0: StartDefer(); michael@0: return 0; michael@0: } michael@0: LOG(("error %d sending string", errno)); michael@0: } michael@0: return result; michael@0: } michael@0: michael@0: // Handles fragmenting binary messages michael@0: int32_t michael@0: DataChannelConnection::SendBinary(DataChannel *channel, const char *data, michael@0: uint32_t len, michael@0: uint32_t ppid_partial, uint32_t ppid_final) michael@0: { michael@0: // Since there's a limit on network buffer size and no limits on message michael@0: // size, and we don't want to use EOR mode (multiple writes for a michael@0: // message, but all other streams are blocked until you finish sending michael@0: // this message), we need to add application-level fragmentation of large michael@0: // messages. On a reliable channel, these can be simply rebuilt into a michael@0: // large message. On an unreliable channel, we can't and don't know how michael@0: // long to wait, and there are no retransmissions, and no easy way to michael@0: // tell the user "this part is missing", so on unreliable channels we michael@0: // need to return an error if sending more bytes than the network buffers michael@0: // can hold, and perhaps a lower number. michael@0: michael@0: // We *really* don't want to do this from main thread! - and SendMsgInternal michael@0: // avoids blocking. michael@0: // This MUST be reliable and in-order for the reassembly to work michael@0: if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT && michael@0: channel->mPrPolicy == DATA_CHANNEL_RELIABLE && michael@0: !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) { michael@0: int32_t sent=0; michael@0: uint32_t origlen = len; michael@0: LOG(("Sending binary message length %u in chunks", len)); michael@0: // XXX check flags for out-of-order, or force in-order for large binary messages michael@0: while (len > 0) { michael@0: uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT); michael@0: uint32_t ppid; michael@0: len -= sendlen; michael@0: ppid = len > 0 ? ppid_partial : ppid_final; michael@0: LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid)); michael@0: // Note that these might end up being deferred and queued. michael@0: sent += SendMsgInternal(channel, data, sendlen, ppid); michael@0: data += sendlen; michael@0: } michael@0: LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued", michael@0: (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT, michael@0: origlen, sent, michael@0: channel->mBufferedData.Length())); michael@0: return sent; michael@0: } michael@0: NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT, michael@0: "Sending too-large data on unreliable channel!"); michael@0: michael@0: // This will fail if the message is too large (default 256K) michael@0: return SendMsgInternal(channel, data, len, ppid_final); michael@0: } michael@0: michael@0: class ReadBlobRunnable : public nsRunnable { michael@0: public: michael@0: ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream, michael@0: nsIInputStream* aBlob) : michael@0: mConnection(aConnection), michael@0: mStream(aStream), michael@0: mBlob(aBlob) michael@0: { } michael@0: michael@0: NS_IMETHODIMP Run() { michael@0: // ReadBlob() is responsible to releasing the reference michael@0: DataChannelConnection *self = mConnection; michael@0: self->ReadBlob(mConnection.forget(), mStream, mBlob); michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: // Make sure the Connection doesn't die while there are jobs outstanding. michael@0: // Let it die (if released by PeerConnectionImpl while we're running) michael@0: // when we send our runnable back to MainThread. Then ~DataChannelConnection michael@0: // can send the IOThread to MainThread to die in a runnable, avoiding michael@0: // unsafe event loop recursion. Evil. michael@0: nsRefPtr mConnection; michael@0: uint16_t mStream; michael@0: // Use RefCount for preventing the object is deleted when SendBlob returns. michael@0: nsRefPtr mBlob; michael@0: }; michael@0: michael@0: int32_t michael@0: DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) michael@0: { michael@0: DataChannel *channel = mStreams[stream]; michael@0: NS_ENSURE_TRUE(channel, 0); michael@0: // Spawn a thread to send the data michael@0: if (!mInternalIOThread) { michael@0: nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread)); michael@0: if (NS_FAILED(res)) { michael@0: return -1; michael@0: } michael@0: } michael@0: michael@0: nsCOMPtr runnable = new ReadBlobRunnable(this, stream, aBlob); michael@0: mInternalIOThread->Dispatch(runnable, NS_DISPATCH_NORMAL); michael@0: return 0; michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::ReadBlob(already_AddRefed aThis, michael@0: uint16_t aStream, nsIInputStream* aBlob) michael@0: { michael@0: // NOTE: 'aThis' has been forgotten by the caller to avoid releasing michael@0: // it off mainthread; if PeerConnectionImpl has released then we want michael@0: // ~DataChannelConnection() to run on MainThread michael@0: michael@0: // XXX to do this safely, we must enqueue these atomically onto the michael@0: // output socket. We need a sender thread(s?) to enque data into the michael@0: // socket and to avoid main-thread IO that might block. Even on a michael@0: // background thread, we may not want to block on one stream's data. michael@0: // I.e. run non-blocking and service multiple channels. michael@0: michael@0: // For now as a hack, send as a single blast of queued packets which may michael@0: // be deferred until buffer space is available. michael@0: nsCString temp; michael@0: uint64_t len; michael@0: nsCOMPtr mainThread; michael@0: NS_GetMainThread(getter_AddRefs(mainThread)); michael@0: michael@0: if (NS_FAILED(aBlob->Available(&len)) || michael@0: NS_FAILED(NS_ReadInputStreamToString(aBlob, temp, len))) { michael@0: // Bug 966602: Doesn't return an error to the caller via onerror. michael@0: // We must release DataChannelConnection on MainThread to avoid issues (bug 876167) michael@0: NS_ProxyRelease(mainThread, aThis.take()); michael@0: return; michael@0: } michael@0: aBlob->Close(); michael@0: RUN_ON_THREAD(mainThread, WrapRunnable(nsRefPtr(aThis), michael@0: &DataChannelConnection::SendBinaryMsg, michael@0: aStream, temp), michael@0: NS_DISPATCH_NORMAL); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::GetStreamIds(std::vector* aStreamList) michael@0: { michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: for (uint32_t i = 0; i < mStreams.Length(); ++i) { michael@0: if (mStreams[i]) { michael@0: aStreamList->push_back(mStreams[i]->mStream); michael@0: } michael@0: } michael@0: } michael@0: michael@0: int32_t michael@0: DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, michael@0: bool isBinary) michael@0: { michael@0: ASSERT_WEBRTC(NS_IsMainThread()); michael@0: // We really could allow this from other threads, so long as we deal with michael@0: // asynchronosity issues with channels closing, in particular access to michael@0: // mStreams, and issues with the association closing (access to mSocket). michael@0: michael@0: const char *data = aMsg.BeginReading(); michael@0: uint32_t len = aMsg.Length(); michael@0: DataChannel *channel; michael@0: michael@0: LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len)); michael@0: // XXX if we want more efficiency, translate flags once at open time michael@0: channel = mStreams[stream]; michael@0: NS_ENSURE_TRUE(channel, 0); michael@0: michael@0: if (isBinary) michael@0: return SendBinary(channel, data, len, michael@0: DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST); michael@0: return SendBinary(channel, data, len, michael@0: DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST); michael@0: } michael@0: michael@0: void michael@0: DataChannelConnection::Close(DataChannel *aChannel) michael@0: { michael@0: MutexAutoLock lock(mLock); michael@0: CloseInt(aChannel); michael@0: } michael@0: michael@0: // So we can call Close() with the lock already held michael@0: // Called from someone who holds a ref via ::Close(), or from ~DataChannel michael@0: void michael@0: DataChannelConnection::CloseInt(DataChannel *aChannel) michael@0: { michael@0: MOZ_ASSERT(aChannel); michael@0: nsRefPtr channel(aChannel); // make sure it doesn't go away on us michael@0: michael@0: mLock.AssertCurrentThreadOwns(); michael@0: LOG(("Connection %p/Channel %p: Closing stream %u", michael@0: channel->mConnection.get(), channel.get(), channel->mStream)); michael@0: // re-test since it may have closed before the lock was grabbed michael@0: if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) { michael@0: LOG(("Channel already closing/closed (%u)", aChannel->mState)); michael@0: if (mState == CLOSED && channel->mStream != INVALID_STREAM) { michael@0: // called from CloseAll() michael@0: // we're not going to hang around waiting any more michael@0: mStreams[channel->mStream] = nullptr; michael@0: } michael@0: return; michael@0: } michael@0: aChannel->mBufferedData.Clear(); michael@0: if (channel->mStream != INVALID_STREAM) { michael@0: ResetOutgoingStream(channel->mStream); michael@0: if (mState == CLOSED) { // called from CloseAll() michael@0: // Let resets accumulate then send all at once in CloseAll() michael@0: // we're not going to hang around waiting michael@0: mStreams[channel->mStream] = nullptr; michael@0: } else { michael@0: SendOutgoingStreamReset(); michael@0: } michael@0: } michael@0: aChannel->mState = CLOSING; michael@0: if (mState == CLOSED) { michael@0: // we're not going to hang around waiting michael@0: channel->Destroy(); michael@0: } michael@0: // At this point when we leave here, the object is a zombie held alive only by the DOM object michael@0: } michael@0: michael@0: void DataChannelConnection::CloseAll() michael@0: { michael@0: LOG(("Closing all channels (connection %p)", (void*) this)); michael@0: // Don't need to lock here michael@0: michael@0: // Make sure no more channels will be opened michael@0: { michael@0: MutexAutoLock lock(mLock); michael@0: mState = CLOSED; michael@0: } michael@0: michael@0: // Close current channels michael@0: // If there are runnables, they hold a strong ref and keep the channel michael@0: // and/or connection alive (even if in a CLOSED state) michael@0: bool closed_some = false; michael@0: for (uint32_t i = 0; i < mStreams.Length(); ++i) { michael@0: if (mStreams[i]) { michael@0: mStreams[i]->Close(); michael@0: closed_some = true; michael@0: } michael@0: } michael@0: michael@0: // Clean up any pending opens for channels michael@0: nsRefPtr channel; michael@0: while (nullptr != (channel = dont_AddRef(static_cast(mPending.PopFront())))) { michael@0: LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream)); michael@0: channel->Close(); // also releases the ref on each iteration michael@0: closed_some = true; michael@0: } michael@0: // It's more efficient to let the Resets queue in shutdown and then michael@0: // SendOutgoingStreamReset() here. michael@0: if (closed_some) { michael@0: MutexAutoLock lock(mLock); michael@0: SendOutgoingStreamReset(); michael@0: } michael@0: } michael@0: michael@0: DataChannel::~DataChannel() michael@0: { michael@0: // NS_ASSERTION since this is more "I think I caught all the cases that michael@0: // can cause this" than a true kill-the-program assertion. If this is michael@0: // wrong, nothing bad happens. A worst it's a leak. michael@0: NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel"); michael@0: } michael@0: michael@0: void michael@0: DataChannel::Close() michael@0: { michael@0: ENSURE_DATACONNECTION; michael@0: mConnection->Close(this); michael@0: } michael@0: michael@0: // Used when disconnecting from the DataChannelConnection michael@0: void michael@0: DataChannel::Destroy() michael@0: { michael@0: ENSURE_DATACONNECTION; michael@0: michael@0: LOG(("Destroying Data channel %u", mStream)); michael@0: MOZ_ASSERT_IF(mStream != INVALID_STREAM, michael@0: !mConnection->FindChannelByStream(mStream)); michael@0: mStream = INVALID_STREAM; michael@0: mState = CLOSED; michael@0: mConnection = nullptr; michael@0: } michael@0: michael@0: void michael@0: DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext) michael@0: { michael@0: MutexAutoLock mLock(mListenerLock); michael@0: mContext = aContext; michael@0: mListener = aListener; michael@0: } michael@0: michael@0: // May be called from another (i.e. Main) thread! michael@0: void michael@0: DataChannel::AppReady() michael@0: { michael@0: ENSURE_DATACONNECTION; michael@0: michael@0: MutexAutoLock lock(mConnection->mLock); michael@0: michael@0: mReady = true; michael@0: if (mState == WAITING_TO_OPEN) { michael@0: mState = OPEN; michael@0: NS_DispatchToMainThread(new DataChannelOnMessageAvailable( michael@0: DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection, michael@0: this)); michael@0: for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) { michael@0: nsCOMPtr runnable = mQueuedMessages[i]; michael@0: MOZ_ASSERT(runnable); michael@0: NS_DispatchToMainThread(runnable); michael@0: } michael@0: } else { michael@0: NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN"); michael@0: } michael@0: mQueuedMessages.Clear(); michael@0: mQueuedMessages.Compact(); michael@0: // We never use it again... We could even allocate the array in the odd michael@0: // cases we need it. michael@0: } michael@0: michael@0: uint32_t michael@0: DataChannel::GetBufferedAmount() michael@0: { michael@0: uint32_t buffered = 0; michael@0: for (uint32_t i = 0; i < mBufferedData.Length(); ++i) { michael@0: buffered += mBufferedData[i]->mLength; michael@0: } michael@0: return buffered; michael@0: } michael@0: michael@0: // Called with mLock locked! michael@0: void michael@0: DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) michael@0: { michael@0: if (!mReady && michael@0: (mState == CONNECTING || mState == WAITING_TO_OPEN)) { michael@0: mQueuedMessages.AppendElement(aMessage); michael@0: } else { michael@0: NS_DispatchToMainThread(aMessage); michael@0: } michael@0: } michael@0: michael@0: } // namespace mozilla