michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* vim: set sw=2 ts=8 et tw=80 : */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "WebSocketLog.h" michael@0: #include "WebSocketChannel.h" michael@0: michael@0: #include "mozilla/Atomics.h" michael@0: #include "mozilla/Attributes.h" michael@0: #include "mozilla/Endian.h" michael@0: #include "mozilla/MathAlgorithms.h" michael@0: michael@0: #include "nsIURI.h" michael@0: #include "nsIChannel.h" michael@0: #include "nsICryptoHash.h" michael@0: #include "nsIRunnable.h" michael@0: #include "nsIPrefBranch.h" michael@0: #include "nsIPrefService.h" michael@0: #include "nsICancelable.h" michael@0: #include "nsIDNSRecord.h" michael@0: #include "nsIDNSService.h" michael@0: #include "nsIStreamConverterService.h" michael@0: #include "nsIIOService2.h" michael@0: #include "nsIProtocolProxyService.h" michael@0: #include "nsIProxyInfo.h" michael@0: #include "nsIProxiedChannel.h" michael@0: #include "nsIAsyncVerifyRedirectCallback.h" michael@0: #include "nsIDashboardEventNotifier.h" michael@0: #include "nsIEventTarget.h" michael@0: #include "nsIHttpChannel.h" michael@0: #include "nsILoadGroup.h" michael@0: #include "nsIProtocolHandler.h" michael@0: #include "nsIRandomGenerator.h" michael@0: #include "nsISocketTransport.h" michael@0: #include "nsThreadUtils.h" michael@0: michael@0: #include "nsAutoPtr.h" michael@0: #include "nsNetCID.h" michael@0: #include "nsServiceManagerUtils.h" michael@0: #include "nsCRT.h" michael@0: #include "nsThreadUtils.h" michael@0: #include "nsError.h" michael@0: #include "nsStringStream.h" michael@0: #include "nsAlgorithm.h" michael@0: #include "nsProxyRelease.h" michael@0: #include "nsNetUtil.h" michael@0: #include "mozilla/StaticMutex.h" michael@0: #include "mozilla/Telemetry.h" michael@0: #include "mozilla/TimeStamp.h" michael@0: michael@0: #include "plbase64.h" michael@0: #include "prmem.h" michael@0: #include "prnetdb.h" michael@0: #include "zlib.h" michael@0: #include michael@0: michael@0: #ifdef MOZ_WIDGET_GONK michael@0: #include "NetStatistics.h" michael@0: #endif michael@0: michael@0: // rather than slurp up all of nsIWebSocket.idl, which lives outside necko, just michael@0: // dupe one constant we need from it michael@0: #define CLOSE_GOING_AWAY 1001 michael@0: michael@0: extern PRThread *gSocketThread; michael@0: michael@0: using namespace mozilla; michael@0: using namespace mozilla::net; michael@0: michael@0: namespace mozilla { michael@0: namespace net { michael@0: michael@0: NS_IMPL_ISUPPORTS(WebSocketChannel, michael@0: nsIWebSocketChannel, michael@0: nsIHttpUpgradeListener, michael@0: nsIRequestObserver, michael@0: nsIStreamListener, michael@0: nsIProtocolHandler, michael@0: nsIInputStreamCallback, michael@0: nsIOutputStreamCallback, michael@0: nsITimerCallback, michael@0: nsIDNSListener, michael@0: nsIProtocolProxyCallback, michael@0: nsIInterfaceRequestor, michael@0: nsIChannelEventSink, michael@0: nsIThreadRetargetableRequest) michael@0: michael@0: // We implement RFC 6455, which uses Sec-WebSocket-Version: 13 on the wire. michael@0: #define SEC_WEBSOCKET_VERSION "13" michael@0: michael@0: /* michael@0: * About SSL unsigned certificates michael@0: * michael@0: * wss will not work to a host using an unsigned certificate unless there michael@0: * is already an exception (i.e. it cannot popup a dialog asking for michael@0: * a security exception). This is similar to how an inlined img will michael@0: * fail without a dialog if fails for the same reason. This should not michael@0: * be a problem in practice as it is expected the websocket javascript michael@0: * is served from the same host as the websocket server (or of course, michael@0: * a valid cert could just be provided). michael@0: * michael@0: */ michael@0: michael@0: // some helper classes michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // FailDelayManager michael@0: // michael@0: // Stores entries (searchable by {host, port}) of connections that have recently michael@0: // failed, so we can do delay of reconnects per RFC 6455 Section 7.2.3 michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: michael@0: // Initial reconnect delay is randomly chosen between 200-400 ms. michael@0: // This is a gentler backoff than the 0-5 seconds the spec offhandedly suggests. michael@0: const uint32_t kWSReconnectInitialBaseDelay = 200; michael@0: const uint32_t kWSReconnectInitialRandomDelay = 200; michael@0: michael@0: // Base lifetime (in ms) of a FailDelay: kept longer if more failures occur michael@0: const uint32_t kWSReconnectBaseLifeTime = 60 * 1000; michael@0: // Maximum reconnect delay (in ms) michael@0: const uint32_t kWSReconnectMaxDelay = 60 * 1000; michael@0: michael@0: // hold record of failed connections, and calculates needed delay for reconnects michael@0: // to same host/port. michael@0: class FailDelay michael@0: { michael@0: public: michael@0: FailDelay(nsCString address, int32_t port) michael@0: : mAddress(address), mPort(port) michael@0: { michael@0: mLastFailure = TimeStamp::Now(); michael@0: mNextDelay = kWSReconnectInitialBaseDelay + michael@0: (rand() % kWSReconnectInitialRandomDelay); michael@0: } michael@0: michael@0: // Called to update settings when connection fails again. michael@0: void FailedAgain() michael@0: { michael@0: mLastFailure = TimeStamp::Now(); michael@0: // We use a truncated exponential backoff as suggested by RFC 6455, michael@0: // but multiply by 1.5 instead of 2 to be more gradual. michael@0: mNextDelay = static_cast( michael@0: std::min(kWSReconnectMaxDelay, mNextDelay * 1.5)); michael@0: LOG(("WebSocket: FailedAgain: host=%s, port=%d: incremented delay to %lu", michael@0: mAddress.get(), mPort, mNextDelay)); michael@0: } michael@0: michael@0: // returns 0 if there is no need to delay (i.e. delay interval is over) michael@0: uint32_t RemainingDelay(TimeStamp rightNow) michael@0: { michael@0: TimeDuration dur = rightNow - mLastFailure; michael@0: uint32_t sinceFail = (uint32_t) dur.ToMilliseconds(); michael@0: if (sinceFail > mNextDelay) michael@0: return 0; michael@0: michael@0: return mNextDelay - sinceFail; michael@0: } michael@0: michael@0: bool IsExpired(TimeStamp rightNow) michael@0: { michael@0: return (mLastFailure + michael@0: TimeDuration::FromMilliseconds(kWSReconnectBaseLifeTime + mNextDelay)) michael@0: <= rightNow; michael@0: } michael@0: michael@0: nsCString mAddress; // IP address (or hostname if using proxy) michael@0: int32_t mPort; michael@0: michael@0: private: michael@0: TimeStamp mLastFailure; // Time of last failed attempt michael@0: // mLastFailure + mNextDelay is the soonest we'll allow a reconnect michael@0: uint32_t mNextDelay; // milliseconds michael@0: }; michael@0: michael@0: class FailDelayManager michael@0: { michael@0: public: michael@0: FailDelayManager() michael@0: { michael@0: MOZ_COUNT_CTOR(FailDelayManager); michael@0: michael@0: mDelaysDisabled = false; michael@0: michael@0: nsCOMPtr prefService = michael@0: do_GetService(NS_PREFSERVICE_CONTRACTID); michael@0: bool boolpref = true; michael@0: nsresult rv; michael@0: rv = prefService->GetBoolPref("network.websocket.delay-failed-reconnects", michael@0: &boolpref); michael@0: if (NS_SUCCEEDED(rv) && !boolpref) { michael@0: mDelaysDisabled = true; michael@0: } michael@0: } michael@0: michael@0: ~FailDelayManager() michael@0: { michael@0: MOZ_COUNT_DTOR(FailDelayManager); michael@0: for (uint32_t i = 0; i < mEntries.Length(); i++) { michael@0: delete mEntries[i]; michael@0: } michael@0: } michael@0: michael@0: void Add(nsCString &address, int32_t port) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: if (mDelaysDisabled) michael@0: return; michael@0: michael@0: FailDelay *record = new FailDelay(address, port); michael@0: mEntries.AppendElement(record); michael@0: } michael@0: michael@0: // Element returned may not be valid after next main thread event: don't keep michael@0: // pointer to it around michael@0: FailDelay* Lookup(nsCString &address, int32_t port, michael@0: uint32_t *outIndex = nullptr) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: if (mDelaysDisabled) michael@0: return nullptr; michael@0: michael@0: FailDelay *result = nullptr; michael@0: TimeStamp rightNow = TimeStamp::Now(); michael@0: michael@0: // We also remove expired entries during search: iterate from end to make michael@0: // indexing simpler michael@0: for (int32_t i = mEntries.Length() - 1; i >= 0; --i) { michael@0: FailDelay *fail = mEntries[i]; michael@0: if (fail->mAddress.Equals(address) && fail->mPort == port) { michael@0: if (outIndex) michael@0: *outIndex = i; michael@0: result = fail; michael@0: // break here: removing more entries would mess up *outIndex. michael@0: // Any remaining expired entries will be deleted next time Lookup michael@0: // finds nothing, which is the most common case anyway. michael@0: break; michael@0: } else if (fail->IsExpired(rightNow)) { michael@0: mEntries.RemoveElementAt(i); michael@0: delete fail; michael@0: } michael@0: } michael@0: return result; michael@0: } michael@0: michael@0: // returns true if channel connects immediately, or false if it's delayed michael@0: void DelayOrBegin(WebSocketChannel *ws) michael@0: { michael@0: if (!mDelaysDisabled) { michael@0: uint32_t failIndex = 0; michael@0: FailDelay *fail = Lookup(ws->mAddress, ws->mPort, &failIndex); michael@0: michael@0: if (fail) { michael@0: TimeStamp rightNow = TimeStamp::Now(); michael@0: michael@0: uint32_t remainingDelay = fail->RemainingDelay(rightNow); michael@0: if (remainingDelay) { michael@0: // reconnecting within delay interval: delay by remaining time michael@0: nsresult rv; michael@0: ws->mReconnectDelayTimer = michael@0: do_CreateInstance("@mozilla.org/timer;1", &rv); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: rv = ws->mReconnectDelayTimer->InitWithCallback( michael@0: ws, remainingDelay, nsITimer::TYPE_ONE_SHOT); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: LOG(("WebSocket: delaying websocket [this=%p] by %lu ms", michael@0: ws, (unsigned long)remainingDelay)); michael@0: ws->mConnecting = CONNECTING_DELAYED; michael@0: return; michael@0: } michael@0: } michael@0: // if timer fails (which is very unlikely), drop down to BeginOpen call michael@0: } else if (fail->IsExpired(rightNow)) { michael@0: mEntries.RemoveElementAt(failIndex); michael@0: delete fail; michael@0: } michael@0: } michael@0: } michael@0: michael@0: // Delays disabled, or no previous failure, or we're reconnecting after scheduled michael@0: // delay interval has passed: connect. michael@0: ws->BeginOpen(); michael@0: } michael@0: michael@0: // Remove() also deletes all expired entries as it iterates: better for michael@0: // battery life than using a periodic timer. michael@0: void Remove(nsCString &address, int32_t port) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: TimeStamp rightNow = TimeStamp::Now(); michael@0: michael@0: // iterate from end, to make deletion indexing easier michael@0: for (int32_t i = mEntries.Length() - 1; i >= 0; --i) { michael@0: FailDelay *entry = mEntries[i]; michael@0: if ((entry->mAddress.Equals(address) && entry->mPort == port) || michael@0: entry->IsExpired(rightNow)) { michael@0: mEntries.RemoveElementAt(i); michael@0: delete entry; michael@0: } michael@0: } michael@0: } michael@0: michael@0: private: michael@0: nsTArray mEntries; michael@0: bool mDelaysDisabled; michael@0: }; michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsWSAdmissionManager michael@0: // michael@0: // 1) Ensures that only one websocket at a time is CONNECTING to a given IP michael@0: // address (or hostname, if using proxy), per RFC 6455 Section 4.1. michael@0: // 2) Delays reconnects to IP/host after connection failure, per Section 7.2.3 michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class nsWSAdmissionManager michael@0: { michael@0: public: michael@0: static void Init() michael@0: { michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: sManager = new nsWSAdmissionManager(); michael@0: } michael@0: } michael@0: michael@0: static void Shutdown() michael@0: { michael@0: StaticMutexAutoLock lock(sLock); michael@0: delete sManager; michael@0: sManager = nullptr; michael@0: } michael@0: michael@0: // Determine if we will open connection immediately (returns true), or michael@0: // delay/queue the connection (returns false) michael@0: static void ConditionallyConnect(WebSocketChannel *ws) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: NS_ABORT_IF_FALSE(ws->mConnecting == NOT_CONNECTING, "opening state"); michael@0: michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: return; michael@0: } michael@0: michael@0: // If there is already another WS channel connecting to this IP address, michael@0: // defer BeginOpen and mark as waiting in queue. michael@0: bool found = (sManager->IndexOf(ws->mAddress) >= 0); michael@0: michael@0: // Always add ourselves to queue, even if we'll connect immediately michael@0: nsOpenConn *newdata = new nsOpenConn(ws->mAddress, ws); michael@0: sManager->mQueue.AppendElement(newdata); michael@0: michael@0: if (found) { michael@0: ws->mConnecting = CONNECTING_QUEUED; michael@0: } else { michael@0: sManager->mFailures.DelayOrBegin(ws); michael@0: } michael@0: } michael@0: michael@0: static void OnConnected(WebSocketChannel *aChannel) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: NS_ABORT_IF_FALSE(aChannel->mConnecting == CONNECTING_IN_PROGRESS, michael@0: "Channel completed connect, but not connecting?"); michael@0: michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: return; michael@0: } michael@0: michael@0: aChannel->mConnecting = NOT_CONNECTING; michael@0: michael@0: // Remove from queue michael@0: sManager->RemoveFromQueue(aChannel); michael@0: michael@0: // Connection succeeded, so stop keeping track of any previous failures michael@0: sManager->mFailures.Remove(aChannel->mAddress, aChannel->mPort); michael@0: michael@0: // Check for queued connections to same host. michael@0: // Note: still need to check for failures, since next websocket with same michael@0: // host may have different port michael@0: sManager->ConnectNext(aChannel->mAddress); michael@0: } michael@0: michael@0: // Called every time a websocket channel ends its session (including going away michael@0: // w/o ever successfully creating a connection) michael@0: static void OnStopSession(WebSocketChannel *aChannel, nsresult aReason) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: return; michael@0: } michael@0: michael@0: if (NS_FAILED(aReason)) { michael@0: // Have we seen this failure before? michael@0: FailDelay *knownFailure = sManager->mFailures.Lookup(aChannel->mAddress, michael@0: aChannel->mPort); michael@0: if (knownFailure) { michael@0: if (aReason == NS_ERROR_NOT_CONNECTED) { michael@0: // Don't count close() before connection as a network error michael@0: LOG(("Websocket close() before connection to %s, %d completed" michael@0: " [this=%p]", aChannel->mAddress.get(), (int)aChannel->mPort, michael@0: aChannel)); michael@0: } else { michael@0: // repeated failure to connect: increase delay for next connection michael@0: knownFailure->FailedAgain(); michael@0: } michael@0: } else { michael@0: // new connection failure: record it. michael@0: LOG(("WebSocket: connection to %s, %d failed: [this=%p]", michael@0: aChannel->mAddress.get(), (int)aChannel->mPort, aChannel)); michael@0: sManager->mFailures.Add(aChannel->mAddress, aChannel->mPort); michael@0: } michael@0: } michael@0: michael@0: if (aChannel->mConnecting) { michael@0: // Only way a connecting channel may get here w/o failing is if it was michael@0: // closed with GOING_AWAY (1001) because of navigation, tab close, etc. michael@0: NS_ABORT_IF_FALSE(NS_FAILED(aReason) || michael@0: aChannel->mScriptCloseCode == CLOSE_GOING_AWAY, michael@0: "websocket closed while connecting w/o failing?"); michael@0: michael@0: sManager->RemoveFromQueue(aChannel); michael@0: michael@0: bool wasNotQueued = (aChannel->mConnecting != CONNECTING_QUEUED); michael@0: aChannel->mConnecting = NOT_CONNECTING; michael@0: if (wasNotQueued) { michael@0: sManager->ConnectNext(aChannel->mAddress); michael@0: } michael@0: } michael@0: } michael@0: michael@0: static void IncrementSessionCount() michael@0: { michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: return; michael@0: } michael@0: sManager->mSessionCount++; michael@0: } michael@0: michael@0: static void DecrementSessionCount() michael@0: { michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: return; michael@0: } michael@0: sManager->mSessionCount--; michael@0: } michael@0: michael@0: static void GetSessionCount(int32_t &aSessionCount) michael@0: { michael@0: StaticMutexAutoLock lock(sLock); michael@0: if (!sManager) { michael@0: return; michael@0: } michael@0: aSessionCount = sManager->mSessionCount; michael@0: } michael@0: michael@0: private: michael@0: nsWSAdmissionManager() : mSessionCount(0) michael@0: { michael@0: MOZ_COUNT_CTOR(nsWSAdmissionManager); michael@0: } michael@0: michael@0: ~nsWSAdmissionManager() michael@0: { michael@0: MOZ_COUNT_DTOR(nsWSAdmissionManager); michael@0: for (uint32_t i = 0; i < mQueue.Length(); i++) michael@0: delete mQueue[i]; michael@0: } michael@0: michael@0: class nsOpenConn michael@0: { michael@0: public: michael@0: nsOpenConn(nsCString &addr, WebSocketChannel *channel) michael@0: : mAddress(addr), mChannel(channel) { MOZ_COUNT_CTOR(nsOpenConn); } michael@0: ~nsOpenConn() { MOZ_COUNT_DTOR(nsOpenConn); } michael@0: michael@0: nsCString mAddress; michael@0: WebSocketChannel *mChannel; michael@0: }; michael@0: michael@0: void ConnectNext(nsCString &hostName) michael@0: { michael@0: int32_t index = IndexOf(hostName); michael@0: if (index >= 0) { michael@0: WebSocketChannel *chan = mQueue[index]->mChannel; michael@0: michael@0: NS_ABORT_IF_FALSE(chan->mConnecting == CONNECTING_QUEUED, michael@0: "transaction not queued but in queue"); michael@0: LOG(("WebSocket: ConnectNext: found channel [this=%p] in queue", chan)); michael@0: michael@0: mFailures.DelayOrBegin(chan); michael@0: } michael@0: } michael@0: michael@0: void RemoveFromQueue(WebSocketChannel *aChannel) michael@0: { michael@0: int32_t index = IndexOf(aChannel); michael@0: NS_ABORT_IF_FALSE(index >= 0, "connection to remove not in queue"); michael@0: if (index >= 0) { michael@0: nsOpenConn *olddata = mQueue[index]; michael@0: mQueue.RemoveElementAt(index); michael@0: delete olddata; michael@0: } michael@0: } michael@0: michael@0: int32_t IndexOf(nsCString &aStr) michael@0: { michael@0: for (uint32_t i = 0; i < mQueue.Length(); i++) michael@0: if (aStr == (mQueue[i])->mAddress) michael@0: return i; michael@0: return -1; michael@0: } michael@0: michael@0: int32_t IndexOf(WebSocketChannel *aChannel) michael@0: { michael@0: for (uint32_t i = 0; i < mQueue.Length(); i++) michael@0: if (aChannel == (mQueue[i])->mChannel) michael@0: return i; michael@0: return -1; michael@0: } michael@0: michael@0: // SessionCount might be decremented from the main or the socket michael@0: // thread, so manage it with atomic counters michael@0: Atomic mSessionCount; michael@0: michael@0: // Queue for websockets that have not completed connecting yet. michael@0: // The first nsOpenConn with a given address will be either be michael@0: // CONNECTING_IN_PROGRESS or CONNECTING_DELAYED. Later ones with the same michael@0: // hostname must be CONNECTING_QUEUED. michael@0: // michael@0: // We could hash hostnames instead of using a single big vector here, but the michael@0: // dataset is expected to be small. michael@0: nsTArray mQueue; michael@0: michael@0: FailDelayManager mFailures; michael@0: michael@0: static nsWSAdmissionManager *sManager; michael@0: static StaticMutex sLock; michael@0: }; michael@0: michael@0: nsWSAdmissionManager *nsWSAdmissionManager::sManager; michael@0: StaticMutex nsWSAdmissionManager::sLock; michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // CallOnMessageAvailable michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class CallOnMessageAvailable MOZ_FINAL : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: CallOnMessageAvailable(WebSocketChannel *aChannel, michael@0: nsCString &aData, michael@0: int32_t aLen) michael@0: : mChannel(aChannel), michael@0: mData(aData), michael@0: mLen(aLen) {} michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread); michael@0: michael@0: if (mLen < 0) michael@0: mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData); michael@0: else michael@0: mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData); michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: ~CallOnMessageAvailable() {} michael@0: michael@0: nsRefPtr mChannel; michael@0: nsCString mData; michael@0: int32_t mLen; michael@0: }; michael@0: NS_IMPL_ISUPPORTS(CallOnMessageAvailable, nsIRunnable) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // CallOnStop michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class CallOnStop MOZ_FINAL : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: CallOnStop(WebSocketChannel *aChannel, michael@0: nsresult aReason) michael@0: : mChannel(aChannel), michael@0: mReason(aReason) {} michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread); michael@0: michael@0: nsWSAdmissionManager::OnStopSession(mChannel, mReason); michael@0: michael@0: if (mChannel->mListener) { michael@0: mChannel->mListener->OnStop(mChannel->mContext, mReason); michael@0: mChannel->mListener = nullptr; michael@0: mChannel->mContext = nullptr; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: ~CallOnStop() {} michael@0: michael@0: nsRefPtr mChannel; michael@0: nsresult mReason; michael@0: }; michael@0: NS_IMPL_ISUPPORTS(CallOnStop, nsIRunnable) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // CallOnServerClose michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class CallOnServerClose MOZ_FINAL : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: CallOnServerClose(WebSocketChannel *aChannel, michael@0: uint16_t aCode, michael@0: nsCString &aReason) michael@0: : mChannel(aChannel), michael@0: mCode(aCode), michael@0: mReason(aReason) {} michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread); michael@0: michael@0: mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason); michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: ~CallOnServerClose() {} michael@0: michael@0: nsRefPtr mChannel; michael@0: uint16_t mCode; michael@0: nsCString mReason; michael@0: }; michael@0: NS_IMPL_ISUPPORTS(CallOnServerClose, nsIRunnable) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // CallAcknowledge michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class CallAcknowledge MOZ_FINAL : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: CallAcknowledge(WebSocketChannel *aChannel, michael@0: uint32_t aSize) michael@0: : mChannel(aChannel), michael@0: mSize(aSize) {} michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: MOZ_ASSERT(NS_GetCurrentThread() == mChannel->mTargetThread); michael@0: michael@0: LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize)); michael@0: mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize); michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: ~CallAcknowledge() {} michael@0: michael@0: nsRefPtr mChannel; michael@0: uint32_t mSize; michael@0: }; michael@0: NS_IMPL_ISUPPORTS(CallAcknowledge, nsIRunnable) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // CallOnTransportAvailable michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class CallOnTransportAvailable MOZ_FINAL : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: CallOnTransportAvailable(WebSocketChannel *aChannel, michael@0: nsISocketTransport *aTransport, michael@0: nsIAsyncInputStream *aSocketIn, michael@0: nsIAsyncOutputStream *aSocketOut) michael@0: : mChannel(aChannel), michael@0: mTransport(aTransport), michael@0: mSocketIn(aSocketIn), michael@0: mSocketOut(aSocketOut) {} michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: LOG(("WebSocketChannel::CallOnTransportAvailable %p\n", this)); michael@0: return mChannel->OnTransportAvailable(mTransport, mSocketIn, mSocketOut); michael@0: } michael@0: michael@0: private: michael@0: ~CallOnTransportAvailable() {} michael@0: michael@0: nsRefPtr mChannel; michael@0: nsCOMPtr mTransport; michael@0: nsCOMPtr mSocketIn; michael@0: nsCOMPtr mSocketOut; michael@0: }; michael@0: NS_IMPL_ISUPPORTS(CallOnTransportAvailable, nsIRunnable) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // OutboundMessage michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: enum WsMsgType { michael@0: kMsgTypeString = 0, michael@0: kMsgTypeBinaryString, michael@0: kMsgTypeStream, michael@0: kMsgTypePing, michael@0: kMsgTypePong, michael@0: kMsgTypeFin michael@0: }; michael@0: michael@0: static const char* msgNames[] = { michael@0: "text", michael@0: "binaryString", michael@0: "binaryStream", michael@0: "ping", michael@0: "pong", michael@0: "close" michael@0: }; michael@0: michael@0: class OutboundMessage michael@0: { michael@0: public: michael@0: OutboundMessage(WsMsgType type, nsCString *str) michael@0: : mMsgType(type) michael@0: { michael@0: MOZ_COUNT_CTOR(OutboundMessage); michael@0: mMsg.pString = str; michael@0: mLength = str ? str->Length() : 0; michael@0: } michael@0: michael@0: OutboundMessage(nsIInputStream *stream, uint32_t length) michael@0: : mMsgType(kMsgTypeStream), mLength(length) michael@0: { michael@0: MOZ_COUNT_CTOR(OutboundMessage); michael@0: mMsg.pStream = stream; michael@0: mMsg.pStream->AddRef(); michael@0: } michael@0: michael@0: ~OutboundMessage() { michael@0: MOZ_COUNT_DTOR(OutboundMessage); michael@0: switch (mMsgType) { michael@0: case kMsgTypeString: michael@0: case kMsgTypeBinaryString: michael@0: case kMsgTypePing: michael@0: case kMsgTypePong: michael@0: delete mMsg.pString; michael@0: break; michael@0: case kMsgTypeStream: michael@0: // for now this only gets hit if msg deleted w/o being sent michael@0: if (mMsg.pStream) { michael@0: mMsg.pStream->Close(); michael@0: mMsg.pStream->Release(); michael@0: } michael@0: break; michael@0: case kMsgTypeFin: michael@0: break; // do-nothing: avoid compiler warning michael@0: } michael@0: } michael@0: michael@0: WsMsgType GetMsgType() const { return mMsgType; } michael@0: int32_t Length() const { return mLength; } michael@0: michael@0: uint8_t* BeginWriting() { michael@0: NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream, michael@0: "Stream should have been converted to string by now"); michael@0: return (uint8_t *)(mMsg.pString ? mMsg.pString->BeginWriting() : nullptr); michael@0: } michael@0: michael@0: uint8_t* BeginReading() { michael@0: NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream, michael@0: "Stream should have been converted to string by now"); michael@0: return (uint8_t *)(mMsg.pString ? mMsg.pString->BeginReading() : nullptr); michael@0: } michael@0: michael@0: nsresult ConvertStreamToString() michael@0: { michael@0: NS_ABORT_IF_FALSE(mMsgType == kMsgTypeStream, "Not a stream!"); michael@0: michael@0: #ifdef DEBUG michael@0: // Make sure we got correct length from Blob michael@0: uint64_t bytes; michael@0: mMsg.pStream->Available(&bytes); michael@0: NS_ASSERTION(bytes == mLength, "Stream length != blob length!"); michael@0: #endif michael@0: michael@0: nsAutoPtr temp(new nsCString()); michael@0: nsresult rv = NS_ReadInputStreamToString(mMsg.pStream, *temp, mLength); michael@0: michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: mMsg.pStream->Close(); michael@0: mMsg.pStream->Release(); michael@0: mMsg.pString = temp.forget(); michael@0: mMsgType = kMsgTypeBinaryString; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: union { michael@0: nsCString *pString; michael@0: nsIInputStream *pStream; michael@0: } mMsg; michael@0: WsMsgType mMsgType; michael@0: uint32_t mLength; michael@0: }; michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // OutboundEnqueuer michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class OutboundEnqueuer MOZ_FINAL : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: OutboundEnqueuer(WebSocketChannel *aChannel, OutboundMessage *aMsg) michael@0: : mChannel(aChannel), mMessage(aMsg) {} michael@0: michael@0: NS_IMETHOD Run() michael@0: { michael@0: mChannel->EnqueueOutgoingMessage(mChannel->mOutgoingMessages, mMessage); michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: ~OutboundEnqueuer() {} michael@0: michael@0: nsRefPtr mChannel; michael@0: OutboundMessage *mMessage; michael@0: }; michael@0: NS_IMPL_ISUPPORTS(OutboundEnqueuer, nsIRunnable) michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsWSCompression michael@0: // michael@0: // similar to nsDeflateConverter except for the mandatory FLUSH calls michael@0: // required by websocket and the absence of the deflate termination michael@0: // block which is appropriate because it would create data bytes after michael@0: // sending the websockets CLOSE message. michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class nsWSCompression michael@0: { michael@0: public: michael@0: nsWSCompression(nsIStreamListener *aListener, michael@0: nsISupports *aContext) michael@0: : mActive(false), michael@0: mContext(aContext), michael@0: mListener(aListener) michael@0: { michael@0: MOZ_COUNT_CTOR(nsWSCompression); michael@0: michael@0: mZlib.zalloc = allocator; michael@0: mZlib.zfree = destructor; michael@0: mZlib.opaque = Z_NULL; michael@0: michael@0: // Initialize the compressor - these are all the normal zlib michael@0: // defaults except window size is set to -15 instead of +15. michael@0: // This is the zlib way of specifying raw RFC 1951 output instead michael@0: // of the zlib rfc 1950 format which has a 2 byte header and michael@0: // adler checksum as a trailer michael@0: michael@0: nsresult rv; michael@0: mStream = do_CreateInstance(NS_STRINGINPUTSTREAM_CONTRACTID, &rv); michael@0: if (NS_SUCCEEDED(rv) && aContext && aListener && michael@0: deflateInit2(&mZlib, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, michael@0: Z_DEFAULT_STRATEGY) == Z_OK) { michael@0: mActive = true; michael@0: } michael@0: } michael@0: michael@0: ~nsWSCompression() michael@0: { michael@0: MOZ_COUNT_DTOR(nsWSCompression); michael@0: michael@0: if (mActive) michael@0: deflateEnd(&mZlib); michael@0: } michael@0: michael@0: bool Active() michael@0: { michael@0: return mActive; michael@0: } michael@0: michael@0: nsresult Deflate(uint8_t *buf1, uint32_t buf1Len, michael@0: uint8_t *buf2, uint32_t buf2Len) michael@0: { michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, michael@0: "not socket thread"); michael@0: NS_ABORT_IF_FALSE(mActive, "not active"); michael@0: michael@0: mZlib.avail_out = kBufferLen; michael@0: mZlib.next_out = mBuffer; michael@0: mZlib.avail_in = buf1Len; michael@0: mZlib.next_in = buf1; michael@0: michael@0: nsresult rv; michael@0: michael@0: while (mZlib.avail_in > 0) { michael@0: deflate(&mZlib, (buf2Len > 0) ? Z_NO_FLUSH : Z_SYNC_FLUSH); michael@0: rv = PushData(); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: mZlib.avail_out = kBufferLen; michael@0: mZlib.next_out = mBuffer; michael@0: } michael@0: michael@0: mZlib.avail_in = buf2Len; michael@0: mZlib.next_in = buf2; michael@0: michael@0: while (mZlib.avail_in > 0) { michael@0: deflate(&mZlib, Z_SYNC_FLUSH); michael@0: rv = PushData(); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: mZlib.avail_out = kBufferLen; michael@0: mZlib.next_out = mBuffer; michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: michael@0: // use zlib data types michael@0: static void *allocator(void *opaque, uInt items, uInt size) michael@0: { michael@0: return moz_xmalloc(items * size); michael@0: } michael@0: michael@0: static void destructor(void *opaque, void *addr) michael@0: { michael@0: moz_free(addr); michael@0: } michael@0: michael@0: nsresult PushData() michael@0: { michael@0: uint32_t bytesToWrite = kBufferLen - mZlib.avail_out; michael@0: if (bytesToWrite > 0) { michael@0: mStream->ShareData(reinterpret_cast(mBuffer), bytesToWrite); michael@0: nsresult rv = michael@0: mListener->OnDataAvailable(nullptr, mContext, mStream, 0, bytesToWrite); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: bool mActive; michael@0: z_stream mZlib; michael@0: nsCOMPtr mStream; michael@0: michael@0: nsISupports *mContext; /* weak ref */ michael@0: nsIStreamListener *mListener; /* weak ref */ michael@0: michael@0: const static int32_t kBufferLen = 4096; michael@0: uint8_t mBuffer[kBufferLen]; michael@0: }; michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // WebSocketChannel michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: uint32_t WebSocketChannel::sSerialSeed = 0; michael@0: michael@0: WebSocketChannel::WebSocketChannel() : michael@0: mPort(0), michael@0: mCloseTimeout(20000), michael@0: mOpenTimeout(20000), michael@0: mConnecting(NOT_CONNECTING), michael@0: mMaxConcurrentConnections(200), michael@0: mGotUpgradeOK(0), michael@0: mRecvdHttpUpgradeTransport(0), michael@0: mRequestedClose(0), michael@0: mClientClosed(0), michael@0: mServerClosed(0), michael@0: mStopped(0), michael@0: mCalledOnStop(0), michael@0: mPingOutstanding(0), michael@0: mAllowCompression(1), michael@0: mAutoFollowRedirects(0), michael@0: mReleaseOnTransmit(0), michael@0: mTCPClosed(0), michael@0: mOpenedHttpChannel(0), michael@0: mDataStarted(0), michael@0: mIncrementedSessionCount(0), michael@0: mDecrementedSessionCount(0), michael@0: mMaxMessageSize(INT32_MAX), michael@0: mStopOnClose(NS_OK), michael@0: mServerCloseCode(CLOSE_ABNORMAL), michael@0: mScriptCloseCode(0), michael@0: mFragmentOpcode(kContinuation), michael@0: mFragmentAccumulator(0), michael@0: mBuffered(0), michael@0: mBufferSize(kIncomingBufferInitialSize), michael@0: mCurrentOut(nullptr), michael@0: mCurrentOutSent(0), michael@0: mCompressor(nullptr), michael@0: mDynamicOutputSize(0), michael@0: mDynamicOutput(nullptr), michael@0: mPrivateBrowsing(false), michael@0: mConnectionLogService(nullptr), michael@0: mCountRecv(0), michael@0: mCountSent(0), michael@0: mAppId(NECKO_NO_APP_ID) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: LOG(("WebSocketChannel::WebSocketChannel() %p\n", this)); michael@0: michael@0: nsWSAdmissionManager::Init(); michael@0: michael@0: mFramePtr = mBuffer = static_cast(moz_xmalloc(mBufferSize)); michael@0: michael@0: nsresult rv; michael@0: mConnectionLogService = do_GetService("@mozilla.org/network/dashboard;1",&rv); michael@0: if (NS_FAILED(rv)) michael@0: LOG(("Failed to initiate dashboard service.")); michael@0: michael@0: mSerial = sSerialSeed++; michael@0: } michael@0: michael@0: WebSocketChannel::~WebSocketChannel() michael@0: { michael@0: LOG(("WebSocketChannel::~WebSocketChannel() %p\n", this)); michael@0: michael@0: if (mWasOpened) { michael@0: MOZ_ASSERT(mCalledOnStop, "WebSocket was opened but OnStop was not called"); michael@0: MOZ_ASSERT(mStopped, "WebSocket was opened but never stopped"); michael@0: } michael@0: MOZ_ASSERT(!mCancelable, "DNS/Proxy Request still alive at destruction"); michael@0: MOZ_ASSERT(!mConnecting, "Should not be connecting in destructor"); michael@0: michael@0: moz_free(mBuffer); michael@0: moz_free(mDynamicOutput); michael@0: delete mCompressor; michael@0: delete mCurrentOut; michael@0: michael@0: while ((mCurrentOut = (OutboundMessage *) mOutgoingPingMessages.PopFront())) michael@0: delete mCurrentOut; michael@0: while ((mCurrentOut = (OutboundMessage *) mOutgoingPongMessages.PopFront())) michael@0: delete mCurrentOut; michael@0: while ((mCurrentOut = (OutboundMessage *) mOutgoingMessages.PopFront())) michael@0: delete mCurrentOut; michael@0: michael@0: nsCOMPtr mainThread; michael@0: nsIURI *forgettable; michael@0: NS_GetMainThread(getter_AddRefs(mainThread)); michael@0: michael@0: if (mURI) { michael@0: mURI.forget(&forgettable); michael@0: NS_ProxyRelease(mainThread, forgettable, false); michael@0: } michael@0: michael@0: if (mOriginalURI) { michael@0: mOriginalURI.forget(&forgettable); michael@0: NS_ProxyRelease(mainThread, forgettable, false); michael@0: } michael@0: michael@0: if (mListener) { michael@0: nsIWebSocketListener *forgettableListener; michael@0: mListener.forget(&forgettableListener); michael@0: NS_ProxyRelease(mainThread, forgettableListener, false); michael@0: } michael@0: michael@0: if (mContext) { michael@0: nsISupports *forgettableContext; michael@0: mContext.forget(&forgettableContext); michael@0: NS_ProxyRelease(mainThread, forgettableContext, false); michael@0: } michael@0: michael@0: if (mLoadGroup) { michael@0: nsILoadGroup *forgettableGroup; michael@0: mLoadGroup.forget(&forgettableGroup); michael@0: NS_ProxyRelease(mainThread, forgettableGroup, false); michael@0: } michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::Shutdown() michael@0: { michael@0: nsWSAdmissionManager::Shutdown(); michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::BeginOpen() michael@0: { michael@0: LOG(("WebSocketChannel::BeginOpen() %p\n", this)); michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: nsresult rv; michael@0: michael@0: // Important that we set CONNECTING_IN_PROGRESS before any call to michael@0: // AbortSession here: ensures that any remaining queued connection(s) are michael@0: // scheduled in OnStopSession michael@0: mConnecting = CONNECTING_IN_PROGRESS; michael@0: michael@0: if (mRedirectCallback) { michael@0: LOG(("WebSocketChannel::BeginOpen: Resuming Redirect\n")); michael@0: rv = mRedirectCallback->OnRedirectVerifyCallback(NS_OK); michael@0: mRedirectCallback = nullptr; michael@0: return; michael@0: } michael@0: michael@0: nsCOMPtr localChannel = do_QueryInterface(mChannel, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::BeginOpen: cannot async open\n")); michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return; michael@0: } michael@0: michael@0: if (localChannel) { michael@0: bool isInBrowser; michael@0: NS_GetAppInfo(localChannel, &mAppId, &isInBrowser); michael@0: } michael@0: michael@0: #ifdef MOZ_WIDGET_GONK michael@0: if (mAppId != NECKO_NO_APP_ID) { michael@0: nsCOMPtr activeNetwork; michael@0: GetActiveNetworkInterface(activeNetwork); michael@0: mActiveNetwork = michael@0: new nsMainThreadPtrHolder(activeNetwork); michael@0: } michael@0: #endif michael@0: michael@0: rv = localChannel->AsyncOpen(this, mHttpChannel); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::BeginOpen: cannot async open\n")); michael@0: AbortSession(NS_ERROR_CONNECTION_REFUSED); michael@0: return; michael@0: } michael@0: mOpenedHttpChannel = 1; michael@0: michael@0: mOpenTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::BeginOpen: cannot create open timer\n")); michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return; michael@0: } michael@0: michael@0: rv = mOpenTimer->InitWithCallback(this, mOpenTimeout, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::BeginOpen: cannot initialize open timer\n")); michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return; michael@0: } michael@0: } michael@0: michael@0: bool michael@0: WebSocketChannel::IsPersistentFramePtr() michael@0: { michael@0: return (mFramePtr >= mBuffer && mFramePtr < mBuffer + mBufferSize); michael@0: } michael@0: michael@0: // Extends the internal buffer by count and returns the total michael@0: // amount of data available for read michael@0: // michael@0: // Accumulated fragment size is passed in instead of using the member michael@0: // variable beacuse when transitioning from the stack to the persistent michael@0: // read buffer we want to explicitly include them in the buffer instead michael@0: // of as already existing data. michael@0: bool michael@0: WebSocketChannel::UpdateReadBuffer(uint8_t *buffer, uint32_t count, michael@0: uint32_t accumulatedFragments, michael@0: uint32_t *available) michael@0: { michael@0: LOG(("WebSocketChannel::UpdateReadBuffer() %p [%p %u]\n", michael@0: this, buffer, count)); michael@0: michael@0: if (!mBuffered) michael@0: mFramePtr = mBuffer; michael@0: michael@0: NS_ABORT_IF_FALSE(IsPersistentFramePtr(), "update read buffer bad mFramePtr"); michael@0: NS_ABORT_IF_FALSE(mFramePtr - accumulatedFragments >= mBuffer, michael@0: "reserved FramePtr bad"); michael@0: michael@0: if (mBuffered + count <= mBufferSize) { michael@0: // append to existing buffer michael@0: LOG(("WebSocketChannel: update read buffer absorbed %u\n", count)); michael@0: } else if (mBuffered + count - michael@0: (mFramePtr - accumulatedFragments - mBuffer) <= mBufferSize) { michael@0: // make room in existing buffer by shifting unused data to start michael@0: mBuffered -= (mFramePtr - mBuffer - accumulatedFragments); michael@0: LOG(("WebSocketChannel: update read buffer shifted %u\n", mBuffered)); michael@0: ::memmove(mBuffer, mFramePtr - accumulatedFragments, mBuffered); michael@0: mFramePtr = mBuffer + accumulatedFragments; michael@0: } else { michael@0: // existing buffer is not sufficient, extend it michael@0: mBufferSize += count + 8192 + mBufferSize/3; michael@0: LOG(("WebSocketChannel: update read buffer extended to %u\n", mBufferSize)); michael@0: uint8_t *old = mBuffer; michael@0: mBuffer = (uint8_t *)moz_realloc(mBuffer, mBufferSize); michael@0: if (!mBuffer) { michael@0: mBuffer = old; michael@0: return false; michael@0: } michael@0: mFramePtr = mBuffer + (mFramePtr - old); michael@0: } michael@0: michael@0: ::memcpy(mBuffer + mBuffered, buffer, count); michael@0: mBuffered += count; michael@0: michael@0: if (available) michael@0: *available = mBuffered - (mFramePtr - mBuffer); michael@0: michael@0: return true; michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::ProcessInput(uint8_t *buffer, uint32_t count) michael@0: { michael@0: LOG(("WebSocketChannel::ProcessInput %p [%d %d]\n", this, count, mBuffered)); michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); michael@0: michael@0: // The purpose of ping/pong is to actively probe the peer so that an michael@0: // unreachable peer is not mistaken for a period of idleness. This michael@0: // implementation accepts any application level read activity as a sign of michael@0: // life, it does not necessarily have to be a pong. michael@0: ResetPingTimer(); michael@0: michael@0: uint32_t avail; michael@0: michael@0: if (!mBuffered) { michael@0: // Most of the time we can process right off the stack buffer without michael@0: // having to accumulate anything michael@0: mFramePtr = buffer; michael@0: avail = count; michael@0: } else { michael@0: if (!UpdateReadBuffer(buffer, count, mFragmentAccumulator, &avail)) { michael@0: return NS_ERROR_FILE_TOO_BIG; michael@0: } michael@0: } michael@0: michael@0: uint8_t *payload; michael@0: uint32_t totalAvail = avail; michael@0: michael@0: while (avail >= 2) { michael@0: int64_t payloadLength64 = mFramePtr[1] & 0x7F; michael@0: uint8_t finBit = mFramePtr[0] & kFinalFragBit; michael@0: uint8_t rsvBits = mFramePtr[0] & 0x70; michael@0: uint8_t maskBit = mFramePtr[1] & kMaskBit; michael@0: uint8_t opcode = mFramePtr[0] & 0x0F; michael@0: michael@0: uint32_t framingLength = 2; michael@0: if (maskBit) michael@0: framingLength += 4; michael@0: michael@0: if (payloadLength64 < 126) { michael@0: if (avail < framingLength) michael@0: break; michael@0: } else if (payloadLength64 == 126) { michael@0: // 16 bit length field michael@0: framingLength += 2; michael@0: if (avail < framingLength) michael@0: break; michael@0: michael@0: payloadLength64 = mFramePtr[2] << 8 | mFramePtr[3]; michael@0: } else { michael@0: // 64 bit length michael@0: framingLength += 8; michael@0: if (avail < framingLength) michael@0: break; michael@0: michael@0: if (mFramePtr[2] & 0x80) { michael@0: // Section 4.2 says that the most significant bit MUST be michael@0: // 0. (i.e. this is really a 63 bit value) michael@0: LOG(("WebSocketChannel:: high bit of 64 bit length set")); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: // copy this in case it is unaligned michael@0: payloadLength64 = NetworkEndian::readInt64(mFramePtr + 2); michael@0: } michael@0: michael@0: payload = mFramePtr + framingLength; michael@0: avail -= framingLength; michael@0: michael@0: LOG(("WebSocketChannel::ProcessInput: payload %lld avail %lu\n", michael@0: payloadLength64, avail)); michael@0: michael@0: if (payloadLength64 + mFragmentAccumulator > mMaxMessageSize) { michael@0: return NS_ERROR_FILE_TOO_BIG; michael@0: } michael@0: uint32_t payloadLength = static_cast(payloadLength64); michael@0: michael@0: if (avail < payloadLength) michael@0: break; michael@0: michael@0: LOG(("WebSocketChannel::ProcessInput: Frame accumulated - opcode %d\n", michael@0: opcode)); michael@0: michael@0: if (maskBit) { michael@0: // This is unexpected - the server does not generally send masked michael@0: // frames to the client, but it is allowed michael@0: LOG(("WebSocketChannel:: Client RECEIVING masked frame.")); michael@0: michael@0: uint32_t mask = NetworkEndian::readUint32(payload - 4); michael@0: ApplyMask(mask, payload, payloadLength); michael@0: } michael@0: michael@0: // Control codes are required to have the fin bit set michael@0: if (!finBit && (opcode & kControlFrameMask)) { michael@0: LOG(("WebSocketChannel:: fragmented control frame code %d\n", opcode)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (rsvBits) { michael@0: LOG(("WebSocketChannel:: unexpected reserved bits %x\n", rsvBits)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (!finBit || opcode == kContinuation) { michael@0: // This is part of a fragment response michael@0: michael@0: // Only the first frame has a non zero op code: Make sure we don't see a michael@0: // first frame while some old fragments are open michael@0: if ((mFragmentAccumulator != 0) && (opcode != kContinuation)) { michael@0: LOG(("WebSocketChannel:: nested fragments\n")); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: LOG(("WebSocketChannel:: Accumulating Fragment %ld\n", payloadLength)); michael@0: michael@0: if (opcode == kContinuation) { michael@0: michael@0: // Make sure this continuation fragment isn't the first fragment michael@0: if (mFragmentOpcode == kContinuation) { michael@0: LOG(("WebSocketHeandler:: continuation code in first fragment\n")); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: // For frag > 1 move the data body back on top of the headers michael@0: // so we have contiguous stream of data michael@0: NS_ABORT_IF_FALSE(mFramePtr + framingLength == payload, michael@0: "payload offset from frameptr wrong"); michael@0: ::memmove(mFramePtr, payload, avail); michael@0: payload = mFramePtr; michael@0: if (mBuffered) michael@0: mBuffered -= framingLength; michael@0: } else { michael@0: mFragmentOpcode = opcode; michael@0: } michael@0: michael@0: if (finBit) { michael@0: LOG(("WebSocketChannel:: Finalizing Fragment\n")); michael@0: payload -= mFragmentAccumulator; michael@0: payloadLength += mFragmentAccumulator; michael@0: avail += mFragmentAccumulator; michael@0: mFragmentAccumulator = 0; michael@0: opcode = mFragmentOpcode; michael@0: // reset to detect if next message illegally starts with continuation michael@0: mFragmentOpcode = kContinuation; michael@0: } else { michael@0: opcode = kContinuation; michael@0: mFragmentAccumulator += payloadLength; michael@0: } michael@0: } else if (mFragmentAccumulator != 0 && !(opcode & kControlFrameMask)) { michael@0: // This frame is not part of a fragment sequence but we michael@0: // have an open fragment.. it must be a control code or else michael@0: // we have a problem michael@0: LOG(("WebSocketChannel:: illegal fragment sequence\n")); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (mServerClosed) { michael@0: LOG(("WebSocketChannel:: ignoring read frame code %d after close\n", michael@0: opcode)); michael@0: // nop michael@0: } else if (mStopped) { michael@0: LOG(("WebSocketChannel:: ignoring read frame code %d after completion\n", michael@0: opcode)); michael@0: } else if (opcode == kText) { michael@0: LOG(("WebSocketChannel:: text frame received\n")); michael@0: if (mListener) { michael@0: nsCString utf8Data; michael@0: if (!utf8Data.Assign((const char *)payload, payloadLength, michael@0: mozilla::fallible_t())) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: // Section 8.1 says to fail connection if invalid utf-8 in text message michael@0: if (!IsUTF8(utf8Data, false)) { michael@0: LOG(("WebSocketChannel:: text frame invalid utf-8\n")); michael@0: return NS_ERROR_CANNOT_CONVERT_DATA; michael@0: } michael@0: michael@0: mTargetThread->Dispatch(new CallOnMessageAvailable(this, utf8Data, -1), michael@0: NS_DISPATCH_NORMAL); michael@0: if (mConnectionLogService && !mPrivateBrowsing) { michael@0: mConnectionLogService->NewMsgReceived(mHost, mSerial, count); michael@0: LOG(("Added new msg received for %s", mHost.get())); michael@0: } michael@0: } michael@0: } else if (opcode & kControlFrameMask) { michael@0: // control frames michael@0: if (payloadLength > 125) { michael@0: LOG(("WebSocketChannel:: bad control frame code %d length %d\n", michael@0: opcode, payloadLength)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (opcode == kClose) { michael@0: LOG(("WebSocketChannel:: close received\n")); michael@0: mServerClosed = 1; michael@0: michael@0: mServerCloseCode = CLOSE_NO_STATUS; michael@0: if (payloadLength >= 2) { michael@0: mServerCloseCode = NetworkEndian::readUint16(payload); michael@0: LOG(("WebSocketChannel:: close recvd code %u\n", mServerCloseCode)); michael@0: uint16_t msglen = static_cast(payloadLength - 2); michael@0: if (msglen > 0) { michael@0: mServerCloseReason.SetLength(msglen); michael@0: memcpy(mServerCloseReason.BeginWriting(), michael@0: (const char *)payload + 2, msglen); michael@0: michael@0: // section 8.1 says to replace received non utf-8 sequences michael@0: // (which are non-conformant to send) with u+fffd, michael@0: // but secteam feels that silently rewriting messages is michael@0: // inappropriate - so we will fail the connection instead. michael@0: if (!IsUTF8(mServerCloseReason, false)) { michael@0: LOG(("WebSocketChannel:: close frame invalid utf-8\n")); michael@0: return NS_ERROR_CANNOT_CONVERT_DATA; michael@0: } michael@0: michael@0: LOG(("WebSocketChannel:: close msg %s\n", michael@0: mServerCloseReason.get())); michael@0: } michael@0: } michael@0: michael@0: if (mCloseTimer) { michael@0: mCloseTimer->Cancel(); michael@0: mCloseTimer = nullptr; michael@0: } michael@0: if (mListener) { michael@0: mTargetThread->Dispatch(new CallOnServerClose(this, mServerCloseCode, michael@0: mServerCloseReason), michael@0: NS_DISPATCH_NORMAL); michael@0: } michael@0: michael@0: if (mClientClosed) michael@0: ReleaseSession(); michael@0: } else if (opcode == kPing) { michael@0: LOG(("WebSocketChannel:: ping received\n")); michael@0: GeneratePong(payload, payloadLength); michael@0: } else if (opcode == kPong) { michael@0: // opcode kPong: the mere act of receiving the packet is all we need michael@0: // to do for the pong to trigger the activity timers michael@0: LOG(("WebSocketChannel:: pong received\n")); michael@0: } else { michael@0: /* unknown control frame opcode */ michael@0: LOG(("WebSocketChannel:: unknown control op code %d\n", opcode)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (mFragmentAccumulator) { michael@0: // Remove the control frame from the stream so we have a contiguous michael@0: // data buffer of reassembled fragments michael@0: LOG(("WebSocketChannel:: Removing Control From Read buffer\n")); michael@0: NS_ABORT_IF_FALSE(mFramePtr + framingLength == payload, michael@0: "payload offset from frameptr wrong"); michael@0: ::memmove(mFramePtr, payload + payloadLength, avail - payloadLength); michael@0: payload = mFramePtr; michael@0: avail -= payloadLength; michael@0: if (mBuffered) michael@0: mBuffered -= framingLength + payloadLength; michael@0: payloadLength = 0; michael@0: } michael@0: } else if (opcode == kBinary) { michael@0: LOG(("WebSocketChannel:: binary frame received\n")); michael@0: if (mListener) { michael@0: nsCString binaryData((const char *)payload, payloadLength); michael@0: mTargetThread->Dispatch(new CallOnMessageAvailable(this, binaryData, michael@0: payloadLength), michael@0: NS_DISPATCH_NORMAL); michael@0: // To add the header to 'Networking Dashboard' log michael@0: if (mConnectionLogService && !mPrivateBrowsing) { michael@0: mConnectionLogService->NewMsgReceived(mHost, mSerial, count); michael@0: LOG(("Added new received msg for %s", mHost.get())); michael@0: } michael@0: } michael@0: } else if (opcode != kContinuation) { michael@0: /* unknown opcode */ michael@0: LOG(("WebSocketChannel:: unknown op code %d\n", opcode)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: mFramePtr = payload + payloadLength; michael@0: avail -= payloadLength; michael@0: totalAvail = avail; michael@0: } michael@0: michael@0: // Adjust the stateful buffer. If we were operating off the stack and michael@0: // now have a partial message then transition to the buffer, or if michael@0: // we were working off the buffer but no longer have any active state michael@0: // then transition to the stack michael@0: if (!IsPersistentFramePtr()) { michael@0: mBuffered = 0; michael@0: michael@0: if (mFragmentAccumulator) { michael@0: LOG(("WebSocketChannel:: Setup Buffer due to fragment")); michael@0: michael@0: if (!UpdateReadBuffer(mFramePtr - mFragmentAccumulator, michael@0: totalAvail + mFragmentAccumulator, 0, nullptr)) { michael@0: return NS_ERROR_FILE_TOO_BIG; michael@0: } michael@0: michael@0: // UpdateReadBuffer will reset the frameptr to the beginning michael@0: // of new saved state, so we need to skip past processed framgents michael@0: mFramePtr += mFragmentAccumulator; michael@0: } else if (totalAvail) { michael@0: LOG(("WebSocketChannel:: Setup Buffer due to partial frame")); michael@0: if (!UpdateReadBuffer(mFramePtr, totalAvail, 0, nullptr)) { michael@0: return NS_ERROR_FILE_TOO_BIG; michael@0: } michael@0: } michael@0: } else if (!mFragmentAccumulator && !totalAvail) { michael@0: // If we were working off a saved buffer state and there is no partial michael@0: // frame or fragment in process, then revert to stack behavior michael@0: LOG(("WebSocketChannel:: Internal buffering not needed anymore")); michael@0: mBuffered = 0; michael@0: michael@0: // release memory if we've been processing a large message michael@0: if (mBufferSize > kIncomingBufferStableSize) { michael@0: mBufferSize = kIncomingBufferStableSize; michael@0: moz_free(mBuffer); michael@0: mBuffer = (uint8_t *)moz_xmalloc(mBufferSize); michael@0: } michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::ApplyMask(uint32_t mask, uint8_t *data, uint64_t len) michael@0: { michael@0: if (!data || len == 0) michael@0: return; michael@0: michael@0: // Optimally we want to apply the mask 32 bits at a time, michael@0: // but the buffer might not be alligned. So we first deal with michael@0: // 0 to 3 bytes of preamble individually michael@0: michael@0: while (len && (reinterpret_cast(data) & 3)) { michael@0: *data ^= mask >> 24; michael@0: mask = RotateLeft(mask, 8); michael@0: data++; michael@0: len--; michael@0: } michael@0: michael@0: // perform mask on full words of data michael@0: michael@0: uint32_t *iData = (uint32_t *) data; michael@0: uint32_t *end = iData + (len / 4); michael@0: NetworkEndian::writeUint32(&mask, mask); michael@0: for (; iData < end; iData++) michael@0: *iData ^= mask; michael@0: mask = NetworkEndian::readUint32(&mask); michael@0: data = (uint8_t *)iData; michael@0: len = len % 4; michael@0: michael@0: // There maybe up to 3 trailing bytes that need to be dealt with michael@0: // individually michael@0: michael@0: while (len) { michael@0: *data ^= mask >> 24; michael@0: mask = RotateLeft(mask, 8); michael@0: data++; michael@0: len--; michael@0: } michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::GeneratePing() michael@0: { michael@0: nsCString *buf = new nsCString(); michael@0: buf->Assign("PING"); michael@0: EnqueueOutgoingMessage(mOutgoingPingMessages, michael@0: new OutboundMessage(kMsgTypePing, buf)); michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::GeneratePong(uint8_t *payload, uint32_t len) michael@0: { michael@0: nsCString *buf = new nsCString(); michael@0: buf->SetLength(len); michael@0: if (buf->Length() < len) { michael@0: LOG(("WebSocketChannel::GeneratePong Allocation Failure\n")); michael@0: delete buf; michael@0: return; michael@0: } michael@0: michael@0: memcpy(buf->BeginWriting(), payload, len); michael@0: EnqueueOutgoingMessage(mOutgoingPongMessages, michael@0: new OutboundMessage(kMsgTypePong, buf)); michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::EnqueueOutgoingMessage(nsDeque &aQueue, michael@0: OutboundMessage *aMsg) michael@0: { michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); michael@0: michael@0: LOG(("WebSocketChannel::EnqueueOutgoingMessage %p " michael@0: "queueing msg %p [type=%s len=%d]\n", michael@0: this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length())); michael@0: michael@0: aQueue.Push(aMsg); michael@0: OnOutputStreamReady(mSocketOut); michael@0: } michael@0: michael@0: michael@0: uint16_t michael@0: WebSocketChannel::ResultToCloseCode(nsresult resultCode) michael@0: { michael@0: if (NS_SUCCEEDED(resultCode)) michael@0: return CLOSE_NORMAL; michael@0: michael@0: switch (resultCode) { michael@0: case NS_ERROR_FILE_TOO_BIG: michael@0: case NS_ERROR_OUT_OF_MEMORY: michael@0: return CLOSE_TOO_LARGE; michael@0: case NS_ERROR_CANNOT_CONVERT_DATA: michael@0: return CLOSE_INVALID_PAYLOAD; michael@0: case NS_ERROR_UNEXPECTED: michael@0: return CLOSE_INTERNAL_ERROR; michael@0: default: michael@0: return CLOSE_PROTOCOL_ERROR; michael@0: } michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::PrimeNewOutgoingMessage() michael@0: { michael@0: LOG(("WebSocketChannel::PrimeNewOutgoingMessage() %p\n", this)); michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); michael@0: NS_ABORT_IF_FALSE(!mCurrentOut, "Current message in progress"); michael@0: michael@0: nsresult rv = NS_OK; michael@0: michael@0: mCurrentOut = (OutboundMessage *)mOutgoingPongMessages.PopFront(); michael@0: if (mCurrentOut) { michael@0: NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePong, michael@0: "Not pong message!"); michael@0: } else { michael@0: mCurrentOut = (OutboundMessage *)mOutgoingPingMessages.PopFront(); michael@0: if (mCurrentOut) michael@0: NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePing, michael@0: "Not ping message!"); michael@0: else michael@0: mCurrentOut = (OutboundMessage *)mOutgoingMessages.PopFront(); michael@0: } michael@0: michael@0: if (!mCurrentOut) michael@0: return; michael@0: michael@0: WsMsgType msgType = mCurrentOut->GetMsgType(); michael@0: michael@0: LOG(("WebSocketChannel::PrimeNewOutgoingMessage " michael@0: "%p found queued msg %p [type=%s len=%d]\n", michael@0: this, mCurrentOut, msgNames[msgType], mCurrentOut->Length())); michael@0: michael@0: mCurrentOutSent = 0; michael@0: mHdrOut = mOutHeader; michael@0: michael@0: uint8_t *payload = nullptr; michael@0: michael@0: if (msgType == kMsgTypeFin) { michael@0: // This is a demand to create a close message michael@0: if (mClientClosed) { michael@0: DeleteCurrentOutGoingMessage(); michael@0: PrimeNewOutgoingMessage(); michael@0: return; michael@0: } michael@0: michael@0: mClientClosed = 1; michael@0: mOutHeader[0] = kFinalFragBit | kClose; michael@0: mOutHeader[1] = kMaskBit; michael@0: michael@0: // payload is offset 6 including 4 for the mask michael@0: payload = mOutHeader + 6; michael@0: michael@0: // The close reason code sits in the first 2 bytes of payload michael@0: // If the channel user provided a code and reason during Close() michael@0: // and there isn't an internal error, use that. michael@0: if (NS_SUCCEEDED(mStopOnClose)) { michael@0: if (mScriptCloseCode) { michael@0: NetworkEndian::writeUint16(payload, mScriptCloseCode); michael@0: mOutHeader[1] += 2; michael@0: mHdrOutToSend = 8; michael@0: if (!mScriptCloseReason.IsEmpty()) { michael@0: NS_ABORT_IF_FALSE(mScriptCloseReason.Length() <= 123, michael@0: "Close Reason Too Long"); michael@0: mOutHeader[1] += mScriptCloseReason.Length(); michael@0: mHdrOutToSend += mScriptCloseReason.Length(); michael@0: memcpy (payload + 2, michael@0: mScriptCloseReason.BeginReading(), michael@0: mScriptCloseReason.Length()); michael@0: } michael@0: } else { michael@0: // No close code/reason, so payload length = 0. We must still send mask michael@0: // even though it's not used. Keep payload offset so we write mask michael@0: // below. michael@0: mHdrOutToSend = 6; michael@0: } michael@0: } else { michael@0: NetworkEndian::writeUint16(payload, ResultToCloseCode(mStopOnClose)); michael@0: mOutHeader[1] += 2; michael@0: mHdrOutToSend = 8; michael@0: } michael@0: michael@0: if (mServerClosed) { michael@0: /* bidi close complete */ michael@0: mReleaseOnTransmit = 1; michael@0: } else if (NS_FAILED(mStopOnClose)) { michael@0: /* result of abort session - give up */ michael@0: StopSession(mStopOnClose); michael@0: } else { michael@0: /* wait for reciprocal close from server */ michael@0: mCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mCloseTimer->InitWithCallback(this, mCloseTimeout, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: } else { michael@0: StopSession(rv); michael@0: } michael@0: } michael@0: } else { michael@0: switch (msgType) { michael@0: case kMsgTypePong: michael@0: mOutHeader[0] = kFinalFragBit | kPong; michael@0: break; michael@0: case kMsgTypePing: michael@0: mOutHeader[0] = kFinalFragBit | kPing; michael@0: break; michael@0: case kMsgTypeString: michael@0: mOutHeader[0] = kFinalFragBit | kText; michael@0: break; michael@0: case kMsgTypeStream: michael@0: // HACK ALERT: read in entire stream into string. michael@0: // Will block socket transport thread if file is blocking. michael@0: // TODO: bug 704447: don't block socket thread! michael@0: rv = mCurrentOut->ConvertStreamToString(); michael@0: if (NS_FAILED(rv)) { michael@0: AbortSession(NS_ERROR_FILE_TOO_BIG); michael@0: return; michael@0: } michael@0: // Now we're a binary string michael@0: msgType = kMsgTypeBinaryString; michael@0: michael@0: // no break: fall down into binary string case michael@0: michael@0: case kMsgTypeBinaryString: michael@0: mOutHeader[0] = kFinalFragBit | kBinary; michael@0: break; michael@0: case kMsgTypeFin: michael@0: NS_ABORT_IF_FALSE(false, "unreachable"); // avoid compiler warning michael@0: break; michael@0: } michael@0: michael@0: if (mCurrentOut->Length() < 126) { michael@0: mOutHeader[1] = mCurrentOut->Length() | kMaskBit; michael@0: mHdrOutToSend = 6; michael@0: } else if (mCurrentOut->Length() <= 0xffff) { michael@0: mOutHeader[1] = 126 | kMaskBit; michael@0: NetworkEndian::writeUint16(mOutHeader + sizeof(uint16_t), michael@0: mCurrentOut->Length()); michael@0: mHdrOutToSend = 8; michael@0: } else { michael@0: mOutHeader[1] = 127 | kMaskBit; michael@0: NetworkEndian::writeUint64(mOutHeader + 2, mCurrentOut->Length()); michael@0: mHdrOutToSend = 14; michael@0: } michael@0: payload = mOutHeader + mHdrOutToSend; michael@0: } michael@0: michael@0: NS_ABORT_IF_FALSE(payload, "payload offset not found"); michael@0: michael@0: // Perform the sending mask. Never use a zero mask michael@0: uint32_t mask; michael@0: do { michael@0: uint8_t *buffer; michael@0: nsresult rv = mRandomGenerator->GenerateRandomBytes(4, &buffer); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::PrimeNewOutgoingMessage(): " michael@0: "GenerateRandomBytes failure %x\n", rv)); michael@0: StopSession(rv); michael@0: return; michael@0: } michael@0: mask = * reinterpret_cast(buffer); michael@0: NS_Free(buffer); michael@0: } while (!mask); michael@0: NetworkEndian::writeUint32(payload - sizeof(uint32_t), mask); michael@0: michael@0: LOG(("WebSocketChannel::PrimeNewOutgoingMessage() using mask %08x\n", mask)); michael@0: michael@0: // We don't mask the framing, but occasionally we stick a little payload michael@0: // data in the buffer used for the framing. Close frames are the current michael@0: // example. This data needs to be masked, but it is never more than a michael@0: // handful of bytes and might rotate the mask, so we can just do it locally. michael@0: // For real data frames we ship the bulk of the payload off to ApplyMask() michael@0: michael@0: while (payload < (mOutHeader + mHdrOutToSend)) { michael@0: *payload ^= mask >> 24; michael@0: mask = RotateLeft(mask, 8); michael@0: payload++; michael@0: } michael@0: michael@0: // Mask the real message payloads michael@0: michael@0: ApplyMask(mask, mCurrentOut->BeginWriting(), mCurrentOut->Length()); michael@0: michael@0: int32_t len = mCurrentOut->Length(); michael@0: michael@0: // for small frames, copy it all together for a contiguous write michael@0: if (len && len <= kCopyBreak) { michael@0: memcpy(mOutHeader + mHdrOutToSend, mCurrentOut->BeginWriting(), len); michael@0: mHdrOutToSend += len; michael@0: mCurrentOutSent = len; michael@0: } michael@0: michael@0: if (len && mCompressor) { michael@0: // assume a 1/3 reduction in size for sizing the buffer michael@0: // the buffer is used multiple times if necessary michael@0: uint32_t currentHeaderSize = mHdrOutToSend; michael@0: mHdrOutToSend = 0; michael@0: michael@0: EnsureHdrOut(32 + (currentHeaderSize + len - mCurrentOutSent) / 2 * 3); michael@0: mCompressor->Deflate(mOutHeader, currentHeaderSize, michael@0: mCurrentOut->BeginReading() + mCurrentOutSent, michael@0: len - mCurrentOutSent); michael@0: michael@0: // All of the compressed data now resides in {mHdrOut, mHdrOutToSend} michael@0: // so do not send the body again michael@0: mCurrentOutSent = len; michael@0: } michael@0: michael@0: // Transmitting begins - mHdrOutToSend bytes from mOutHeader and michael@0: // mCurrentOut->Length() bytes from mCurrentOut. The latter may be michael@0: // coaleseced into the former for small messages or as the result of the michael@0: // compression process, michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::DeleteCurrentOutGoingMessage() michael@0: { michael@0: delete mCurrentOut; michael@0: mCurrentOut = nullptr; michael@0: mCurrentOutSent = 0; michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::EnsureHdrOut(uint32_t size) michael@0: { michael@0: LOG(("WebSocketChannel::EnsureHdrOut() %p [%d]\n", this, size)); michael@0: michael@0: if (mDynamicOutputSize < size) { michael@0: mDynamicOutputSize = size; michael@0: mDynamicOutput = michael@0: (uint8_t *) moz_xrealloc(mDynamicOutput, mDynamicOutputSize); michael@0: } michael@0: michael@0: mHdrOut = mDynamicOutput; michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::CleanupConnection() michael@0: { michael@0: LOG(("WebSocketChannel::CleanupConnection() %p", this)); michael@0: michael@0: if (mLingeringCloseTimer) { michael@0: mLingeringCloseTimer->Cancel(); michael@0: mLingeringCloseTimer = nullptr; michael@0: } michael@0: michael@0: if (mSocketIn) { michael@0: mSocketIn->AsyncWait(nullptr, 0, 0, nullptr); michael@0: mSocketIn = nullptr; michael@0: } michael@0: michael@0: if (mSocketOut) { michael@0: mSocketOut->AsyncWait(nullptr, 0, 0, nullptr); michael@0: mSocketOut = nullptr; michael@0: } michael@0: michael@0: if (mTransport) { michael@0: mTransport->SetSecurityCallbacks(nullptr); michael@0: mTransport->SetEventSink(nullptr, nullptr); michael@0: mTransport->Close(NS_BASE_STREAM_CLOSED); michael@0: mTransport = nullptr; michael@0: } michael@0: michael@0: if (mConnectionLogService && !mPrivateBrowsing) { michael@0: mConnectionLogService->RemoveHost(mHost, mSerial); michael@0: } michael@0: michael@0: DecrementSessionCount(); michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::StopSession(nsresult reason) michael@0: { michael@0: LOG(("WebSocketChannel::StopSession() %p [%x]\n", this, reason)); michael@0: michael@0: // normally this should be called on socket thread, but it is ok to call it michael@0: // from OnStartRequest before the socket thread machine has gotten underway michael@0: michael@0: mStopped = 1; michael@0: michael@0: if (!mOpenedHttpChannel) { michael@0: // The HTTP channel information will never be used in this case michael@0: mChannel = nullptr; michael@0: mHttpChannel = nullptr; michael@0: mLoadGroup = nullptr; michael@0: mCallbacks = nullptr; michael@0: } michael@0: michael@0: if (mCloseTimer) { michael@0: mCloseTimer->Cancel(); michael@0: mCloseTimer = nullptr; michael@0: } michael@0: michael@0: if (mOpenTimer) { michael@0: mOpenTimer->Cancel(); michael@0: mOpenTimer = nullptr; michael@0: } michael@0: michael@0: if (mReconnectDelayTimer) { michael@0: mReconnectDelayTimer->Cancel(); michael@0: mReconnectDelayTimer = nullptr; michael@0: } michael@0: michael@0: if (mPingTimer) { michael@0: mPingTimer->Cancel(); michael@0: mPingTimer = nullptr; michael@0: } michael@0: michael@0: if (mSocketIn && !mTCPClosed) { michael@0: // Drain, within reason, this socket. if we leave any data michael@0: // unconsumed (including the tcp fin) a RST will be generated michael@0: // The right thing to do here is shutdown(SHUT_WR) and then wait michael@0: // a little while to see if any data comes in.. but there is no michael@0: // reason to delay things for that when the websocket handshake michael@0: // is supposed to guarantee a quiet connection except for that fin. michael@0: michael@0: char buffer[512]; michael@0: uint32_t count = 0; michael@0: uint32_t total = 0; michael@0: nsresult rv; michael@0: do { michael@0: total += count; michael@0: rv = mSocketIn->Read(buffer, 512, &count); michael@0: if (rv != NS_BASE_STREAM_WOULD_BLOCK && michael@0: (NS_FAILED(rv) || count == 0)) michael@0: mTCPClosed = true; michael@0: } while (NS_SUCCEEDED(rv) && count > 0 && total < 32000); michael@0: } michael@0: michael@0: int32_t sessionCount = kLingeringCloseThreshold; michael@0: nsWSAdmissionManager::GetSessionCount(sessionCount); michael@0: michael@0: if (!mTCPClosed && mTransport && sessionCount < kLingeringCloseThreshold) { michael@0: michael@0: // 7.1.1 says that the client SHOULD wait for the server to close the TCP michael@0: // connection. This is so we can reuse port numbers before 2 MSL expires, michael@0: // which is not really as much of a concern for us as the amount of state michael@0: // that might be accrued by keeping this channel object around waiting for michael@0: // the server. We handle the SHOULD by waiting a short time in the common michael@0: // case, but not waiting in the case of high concurrency. michael@0: // michael@0: // Normally this will be taken care of in AbortSession() after mTCPClosed michael@0: // is set when the server close arrives without waiting for the timeout to michael@0: // expire. michael@0: michael@0: LOG(("WebSocketChannel::StopSession: Wait for Server TCP close")); michael@0: michael@0: nsresult rv; michael@0: mLingeringCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); michael@0: if (NS_SUCCEEDED(rv)) michael@0: mLingeringCloseTimer->InitWithCallback(this, kLingeringCloseTimeout, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: else michael@0: CleanupConnection(); michael@0: } else { michael@0: CleanupConnection(); michael@0: } michael@0: michael@0: if (mCancelable) { michael@0: mCancelable->Cancel(NS_ERROR_UNEXPECTED); michael@0: mCancelable = nullptr; michael@0: } michael@0: michael@0: mInflateReader = nullptr; michael@0: mInflateStream = nullptr; michael@0: michael@0: delete mCompressor; michael@0: mCompressor = nullptr; michael@0: michael@0: if (!mCalledOnStop) { michael@0: mCalledOnStop = 1; michael@0: mTargetThread->Dispatch(new CallOnStop(this, reason), michael@0: NS_DISPATCH_NORMAL); michael@0: } michael@0: michael@0: return; michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::AbortSession(nsresult reason) michael@0: { michael@0: LOG(("WebSocketChannel::AbortSession() %p [reason %x] stopped = %d\n", michael@0: this, reason, mStopped)); michael@0: michael@0: // normally this should be called on socket thread, but it is ok to call it michael@0: // from the main thread before StartWebsocketData() has completed michael@0: michael@0: // When we are failing we need to close the TCP connection immediately michael@0: // as per 7.1.1 michael@0: mTCPClosed = true; michael@0: michael@0: if (mLingeringCloseTimer) { michael@0: NS_ABORT_IF_FALSE(mStopped, "Lingering without Stop"); michael@0: LOG(("WebSocketChannel:: Cleanup connection based on TCP Close")); michael@0: CleanupConnection(); michael@0: return; michael@0: } michael@0: michael@0: if (mStopped) michael@0: return; michael@0: mStopped = 1; michael@0: michael@0: if (mTransport && reason != NS_BASE_STREAM_CLOSED && michael@0: !mRequestedClose && !mClientClosed && !mServerClosed) { michael@0: mRequestedClose = 1; michael@0: mStopOnClose = reason; michael@0: mSocketThread->Dispatch( michael@0: new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nullptr)), michael@0: nsIEventTarget::DISPATCH_NORMAL); michael@0: } else { michael@0: StopSession(reason); michael@0: } michael@0: } michael@0: michael@0: // ReleaseSession is called on orderly shutdown michael@0: void michael@0: WebSocketChannel::ReleaseSession() michael@0: { michael@0: LOG(("WebSocketChannel::ReleaseSession() %p stopped = %d\n", michael@0: this, mStopped)); michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); michael@0: michael@0: if (mStopped) michael@0: return; michael@0: StopSession(NS_OK); michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::IncrementSessionCount() michael@0: { michael@0: if (!mIncrementedSessionCount) { michael@0: nsWSAdmissionManager::IncrementSessionCount(); michael@0: mIncrementedSessionCount = 1; michael@0: } michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::DecrementSessionCount() michael@0: { michael@0: // Make sure we decrement session count only once, and only if we incremented it. michael@0: // This code is thread-safe: sWebSocketAdmissions->DecrementSessionCount is michael@0: // atomic, and mIncrementedSessionCount/mDecrementedSessionCount are set at michael@0: // times when they'll never be a race condition for checking/setting them. michael@0: if (mIncrementedSessionCount && !mDecrementedSessionCount) { michael@0: nsWSAdmissionManager::DecrementSessionCount(); michael@0: mDecrementedSessionCount = 1; michael@0: } michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::HandleExtensions() michael@0: { michael@0: LOG(("WebSocketChannel::HandleExtensions() %p\n", this)); michael@0: michael@0: nsresult rv; michael@0: nsAutoCString extensions; michael@0: michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: rv = mHttpChannel->GetResponseHeader( michael@0: NS_LITERAL_CSTRING("Sec-WebSocket-Extensions"), extensions); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: if (!extensions.IsEmpty()) { michael@0: if (!extensions.Equals(NS_LITERAL_CSTRING("deflate-stream"))) { michael@0: LOG(("WebSocketChannel::OnStartRequest: " michael@0: "HTTP Sec-WebSocket-Exensions negotiated unknown value %s\n", michael@0: extensions.get())); michael@0: AbortSession(NS_ERROR_ILLEGAL_VALUE); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (!mAllowCompression) { michael@0: LOG(("WebSocketChannel::HandleExtensions: " michael@0: "Recvd Compression Extension that wasn't offered\n")); michael@0: AbortSession(NS_ERROR_ILLEGAL_VALUE); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: nsCOMPtr serv = michael@0: do_GetService(NS_STREAMCONVERTERSERVICE_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel:: Cannot find compression service\n")); michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: rv = serv->AsyncConvertData("deflate", "uncompressed", this, nullptr, michael@0: getter_AddRefs(mInflateReader)); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel:: Cannot find inflate listener\n")); michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: mInflateStream = do_CreateInstance(NS_STRINGINPUTSTREAM_CONTRACTID, &rv); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel:: Cannot find inflate stream\n")); michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: mCompressor = new nsWSCompression(this, mSocketOut); michael@0: if (!mCompressor->Active()) { michael@0: LOG(("WebSocketChannel:: Cannot init deflate object\n")); michael@0: delete mCompressor; michael@0: mCompressor = nullptr; michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: mNegotiatedExtensions = extensions; michael@0: } michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::SetupRequest() michael@0: { michael@0: LOG(("WebSocketChannel::SetupRequest() %p\n", this)); michael@0: michael@0: nsresult rv; michael@0: michael@0: if (mLoadGroup) { michael@0: rv = mHttpChannel->SetLoadGroup(mLoadGroup); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: } michael@0: michael@0: rv = mHttpChannel->SetLoadFlags(nsIRequest::LOAD_BACKGROUND | michael@0: nsIRequest::INHIBIT_CACHING | michael@0: nsIRequest::LOAD_BYPASS_CACHE); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: // we never let websockets be blocked by head CSS/JS loads to avoid michael@0: // potential deadlock where server generation of CSS/JS requires michael@0: // an XHR signal. michael@0: rv = mChannel->SetLoadUnblocked(true); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: // draft-ietf-hybi-thewebsocketprotocol-07 illustrates Upgrade: websocket michael@0: // in lower case, so go with that. It is technically case insensitive. michael@0: rv = mChannel->HTTPUpgrade(NS_LITERAL_CSTRING("websocket"), this); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: mHttpChannel->SetRequestHeader( michael@0: NS_LITERAL_CSTRING("Sec-WebSocket-Version"), michael@0: NS_LITERAL_CSTRING(SEC_WEBSOCKET_VERSION), false); michael@0: michael@0: if (!mOrigin.IsEmpty()) michael@0: mHttpChannel->SetRequestHeader(NS_LITERAL_CSTRING("Origin"), mOrigin, michael@0: false); michael@0: michael@0: if (!mProtocol.IsEmpty()) michael@0: mHttpChannel->SetRequestHeader(NS_LITERAL_CSTRING("Sec-WebSocket-Protocol"), michael@0: mProtocol, true); michael@0: michael@0: if (mAllowCompression) michael@0: mHttpChannel->SetRequestHeader(NS_LITERAL_CSTRING("Sec-WebSocket-Extensions"), michael@0: NS_LITERAL_CSTRING("deflate-stream"), michael@0: false); michael@0: michael@0: uint8_t *secKey; michael@0: nsAutoCString secKeyString; michael@0: michael@0: rv = mRandomGenerator->GenerateRandomBytes(16, &secKey); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: char* b64 = PL_Base64Encode((const char *)secKey, 16, nullptr); michael@0: NS_Free(secKey); michael@0: if (!b64) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: secKeyString.Assign(b64); michael@0: PR_Free(b64); michael@0: mHttpChannel->SetRequestHeader(NS_LITERAL_CSTRING("Sec-WebSocket-Key"), michael@0: secKeyString, false); michael@0: LOG(("WebSocketChannel::SetupRequest: client key %s\n", secKeyString.get())); michael@0: michael@0: // prepare the value we expect to see in michael@0: // the sec-websocket-accept response header michael@0: secKeyString.AppendLiteral("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); michael@0: nsCOMPtr hasher = michael@0: do_CreateInstance(NS_CRYPTO_HASH_CONTRACTID, &rv); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: rv = hasher->Init(nsICryptoHash::SHA1); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: rv = hasher->Update((const uint8_t *) secKeyString.BeginWriting(), michael@0: secKeyString.Length()); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: rv = hasher->Finish(true, mHashedSecret); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: LOG(("WebSocketChannel::SetupRequest: expected server key %s\n", michael@0: mHashedSecret.get())); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::DoAdmissionDNS() michael@0: { michael@0: nsresult rv; michael@0: michael@0: nsCString hostName; michael@0: rv = mURI->GetHost(hostName); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: mAddress = hostName; michael@0: rv = mURI->GetPort(&mPort); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: if (mPort == -1) michael@0: mPort = (mEncrypted ? kDefaultWSSPort : kDefaultWSPort); michael@0: nsCOMPtr dns = do_GetService(NS_DNSSERVICE_CONTRACTID, &rv); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: nsCOMPtr mainThread; michael@0: NS_GetMainThread(getter_AddRefs(mainThread)); michael@0: MOZ_ASSERT(!mCancelable); michael@0: return dns->AsyncResolve(hostName, 0, this, mainThread, getter_AddRefs(mCancelable)); michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::ApplyForAdmission() michael@0: { michael@0: LOG(("WebSocketChannel::ApplyForAdmission() %p\n", this)); michael@0: michael@0: // Websockets has a policy of 1 session at a time being allowed in the michael@0: // CONNECTING state per server IP address (not hostname) michael@0: michael@0: // Check to see if a proxy is being used before making DNS call michael@0: nsCOMPtr pps = michael@0: do_GetService(NS_PROTOCOLPROXYSERVICE_CONTRACTID); michael@0: michael@0: if (!pps) { michael@0: // go straight to DNS michael@0: // expect the callback in ::OnLookupComplete michael@0: LOG(("WebSocketChannel::ApplyForAdmission: checking for concurrent open\n")); michael@0: return DoAdmissionDNS(); michael@0: } michael@0: michael@0: MOZ_ASSERT(!mCancelable); michael@0: michael@0: return pps->AsyncResolve(mHttpChannel, michael@0: nsIProtocolProxyService::RESOLVE_PREFER_HTTPS_PROXY | michael@0: nsIProtocolProxyService::RESOLVE_ALWAYS_TUNNEL, michael@0: this, getter_AddRefs(mCancelable)); michael@0: } michael@0: michael@0: // Called after both OnStartRequest and OnTransportAvailable have michael@0: // executed. This essentially ends the handshake and starts the websockets michael@0: // protocol state machine. michael@0: nsresult michael@0: WebSocketChannel::StartWebsocketData() michael@0: { michael@0: LOG(("WebSocketChannel::StartWebsocketData() %p", this)); michael@0: NS_ABORT_IF_FALSE(!mDataStarted, "StartWebsocketData twice"); michael@0: mDataStarted = 1; michael@0: michael@0: // We're now done CONNECTING, which means we can now open another, michael@0: // perhaps parallel, connection to the same host if one michael@0: // is pending michael@0: nsWSAdmissionManager::OnConnected(this); michael@0: michael@0: LOG(("WebSocketChannel::StartWebsocketData Notifying Listener %p\n", michael@0: mListener.get())); michael@0: michael@0: if (mListener) michael@0: mListener->OnStart(mContext); michael@0: michael@0: // Start keepalive ping timer, if we're using keepalive. michael@0: if (mPingInterval) { michael@0: nsresult rv; michael@0: mPingTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); michael@0: if (NS_FAILED(rv)) { michael@0: NS_WARNING("unable to create ping timer. Carrying on."); michael@0: } else { michael@0: LOG(("WebSocketChannel will generate ping after %d ms of receive silence\n", michael@0: mPingInterval)); michael@0: mPingTimer->SetTarget(mSocketThread); michael@0: mPingTimer->InitWithCallback(this, mPingInterval, nsITimer::TYPE_ONE_SHOT); michael@0: } michael@0: } michael@0: michael@0: return mSocketIn->AsyncWait(this, 0, 0, mSocketThread); michael@0: } michael@0: michael@0: void michael@0: WebSocketChannel::ReportConnectionTelemetry() michael@0: { michael@0: // 3 bits are used. high bit is for wss, middle bit for failed, michael@0: // and low bit for proxy.. michael@0: // 0 - 7 : ws-ok-plain, ws-ok-proxy, ws-failed-plain, ws-failed-proxy, michael@0: // wss-ok-plain, wss-ok-proxy, wss-failed-plain, wss-failed-proxy michael@0: michael@0: bool didProxy = false; michael@0: michael@0: nsCOMPtr pi; michael@0: nsCOMPtr pc = do_QueryInterface(mChannel); michael@0: if (pc) michael@0: pc->GetProxyInfo(getter_AddRefs(pi)); michael@0: if (pi) { michael@0: nsAutoCString proxyType; michael@0: pi->GetType(proxyType); michael@0: if (!proxyType.IsEmpty() && michael@0: !proxyType.Equals(NS_LITERAL_CSTRING("direct"))) michael@0: didProxy = true; michael@0: } michael@0: michael@0: uint8_t value = (mEncrypted ? (1 << 2) : 0) | michael@0: (!mGotUpgradeOK ? (1 << 1) : 0) | michael@0: (didProxy ? (1 << 0) : 0); michael@0: michael@0: LOG(("WebSocketChannel::ReportConnectionTelemetry() %p %d", this, value)); michael@0: Telemetry::Accumulate(Telemetry::WEBSOCKETS_HANDSHAKE_TYPE, value); michael@0: } michael@0: michael@0: // nsIDNSListener michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnLookupComplete(nsICancelable *aRequest, michael@0: nsIDNSRecord *aRecord, michael@0: nsresult aStatus) michael@0: { michael@0: LOG(("WebSocketChannel::OnLookupComplete() %p [%p %p %x]\n", michael@0: this, aRequest, aRecord, aStatus)); michael@0: michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: if (mStopped) { michael@0: LOG(("WebSocketChannel::OnLookupComplete: Request Already Stopped\n")); michael@0: mCancelable = nullptr; michael@0: return NS_OK; michael@0: } michael@0: michael@0: mCancelable = nullptr; michael@0: michael@0: // These failures are not fatal - we just use the hostname as the key michael@0: if (NS_FAILED(aStatus)) { michael@0: LOG(("WebSocketChannel::OnLookupComplete: No DNS Response\n")); michael@0: michael@0: // set host in case we got here without calling DoAdmissionDNS() michael@0: mURI->GetHost(mAddress); michael@0: } else { michael@0: nsresult rv = aRecord->GetNextAddrAsString(mAddress); michael@0: if (NS_FAILED(rv)) michael@0: LOG(("WebSocketChannel::OnLookupComplete: Failed GetNextAddr\n")); michael@0: } michael@0: michael@0: LOG(("WebSocket OnLookupComplete: Proceeding to ConditionallyConnect\n")); michael@0: nsWSAdmissionManager::ConditionallyConnect(this); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsIProtocolProxyCallback michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnProxyAvailable(nsICancelable *aRequest, nsIChannel *aChannel, michael@0: nsIProxyInfo *pi, nsresult status) michael@0: { michael@0: if (mStopped) { michael@0: LOG(("WebSocketChannel::OnProxyAvailable: [%p] Request Already Stopped\n", this)); michael@0: mCancelable = nullptr; michael@0: return NS_OK; michael@0: } michael@0: michael@0: MOZ_ASSERT(aRequest == mCancelable); michael@0: mCancelable = nullptr; michael@0: michael@0: nsAutoCString type; michael@0: if (NS_SUCCEEDED(status) && pi && michael@0: NS_SUCCEEDED(pi->GetType(type)) && michael@0: !type.EqualsLiteral("direct")) { michael@0: LOG(("WebSocket OnProxyAvailable [%p] Proxy found skip DNS lookup\n", this)); michael@0: // call DNS callback directly without DNS resolver michael@0: OnLookupComplete(nullptr, nullptr, NS_ERROR_FAILURE); michael@0: return NS_OK; michael@0: } michael@0: michael@0: LOG(("WebSocketChannel::OnProxyAvailable[%] checking DNS resolution\n", this)); michael@0: DoAdmissionDNS(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsIInterfaceRequestor michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::GetInterface(const nsIID & iid, void **result) michael@0: { michael@0: LOG(("WebSocketChannel::GetInterface() %p\n", this)); michael@0: michael@0: if (iid.Equals(NS_GET_IID(nsIChannelEventSink))) michael@0: return QueryInterface(iid, result); michael@0: michael@0: if (mCallbacks) michael@0: return mCallbacks->GetInterface(iid, result); michael@0: michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: // nsIChannelEventSink michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::AsyncOnChannelRedirect( michael@0: nsIChannel *oldChannel, michael@0: nsIChannel *newChannel, michael@0: uint32_t flags, michael@0: nsIAsyncVerifyRedirectCallback *callback) michael@0: { michael@0: LOG(("WebSocketChannel::AsyncOnChannelRedirect() %p\n", this)); michael@0: nsresult rv; michael@0: michael@0: nsCOMPtr newuri; michael@0: rv = newChannel->GetURI(getter_AddRefs(newuri)); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: // newuri is expected to be http or https michael@0: bool newuriIsHttps = false; michael@0: rv = newuri->SchemeIs("https", &newuriIsHttps); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: if (!mAutoFollowRedirects) { michael@0: // Even if redirects configured off, still allow them for HTTP Strict michael@0: // Transport Security (from ws://FOO to https://FOO (mapped to wss://FOO) michael@0: michael@0: nsCOMPtr clonedNewURI; michael@0: rv = newuri->Clone(getter_AddRefs(clonedNewURI)); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: rv = clonedNewURI->SetScheme(NS_LITERAL_CSTRING("ws")); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: nsCOMPtr currentURI; michael@0: rv = GetURI(getter_AddRefs(currentURI)); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: // currentURI is expected to be ws or wss michael@0: bool currentIsHttps = false; michael@0: rv = currentURI->SchemeIs("wss", ¤tIsHttps); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: bool uriEqual = false; michael@0: rv = clonedNewURI->Equals(currentURI, &uriEqual); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: // It's only a HSTS redirect if we started with non-secure, are going to michael@0: // secure, and the new URI is otherwise the same as the old one. michael@0: if (!(!currentIsHttps && newuriIsHttps && uriEqual)) { michael@0: nsAutoCString newSpec; michael@0: rv = newuri->GetSpec(newSpec); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: LOG(("WebSocketChannel: Redirect to %s denied by configuration\n", michael@0: newSpec.get())); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: } michael@0: michael@0: if (mEncrypted && !newuriIsHttps) { michael@0: nsAutoCString spec; michael@0: if (NS_SUCCEEDED(newuri->GetSpec(spec))) michael@0: LOG(("WebSocketChannel: Redirect to %s violates encryption rule\n", michael@0: spec.get())); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: nsCOMPtr newHttpChannel = do_QueryInterface(newChannel, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel: Redirect could not QI to HTTP\n")); michael@0: return rv; michael@0: } michael@0: michael@0: nsCOMPtr newUpgradeChannel = michael@0: do_QueryInterface(newChannel, &rv); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel: Redirect could not QI to HTTP Upgrade\n")); michael@0: return rv; michael@0: } michael@0: michael@0: // The redirect is likely OK michael@0: michael@0: newChannel->SetNotificationCallbacks(this); michael@0: michael@0: mEncrypted = newuriIsHttps; michael@0: newuri->Clone(getter_AddRefs(mURI)); michael@0: if (mEncrypted) michael@0: rv = mURI->SetScheme(NS_LITERAL_CSTRING("wss")); michael@0: else michael@0: rv = mURI->SetScheme(NS_LITERAL_CSTRING("ws")); michael@0: michael@0: mHttpChannel = newHttpChannel; michael@0: mChannel = newUpgradeChannel; michael@0: rv = SetupRequest(); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel: Redirect could not SetupRequest()\n")); michael@0: return rv; michael@0: } michael@0: michael@0: // Redirected-to URI may need to be delayed by 1-connecting-per-host and michael@0: // delay-after-fail algorithms. So hold off calling OnRedirectVerifyCallback michael@0: // until BeginOpen, when we know it's OK to proceed with new channel. michael@0: mRedirectCallback = callback; michael@0: michael@0: // Mark old channel as successfully connected so we'll clear any FailDelay michael@0: // associated with the old URI. Note: no need to also call OnStopSession: michael@0: // it's a no-op for successful, already-connected channels. michael@0: nsWSAdmissionManager::OnConnected(this); michael@0: michael@0: // ApplyForAdmission as if we were starting from fresh... michael@0: mAddress.Truncate(); michael@0: mOpenedHttpChannel = 0; michael@0: rv = ApplyForAdmission(); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel: Redirect failed due to DNS failure\n")); michael@0: mRedirectCallback = nullptr; michael@0: return rv; michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsITimerCallback michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::Notify(nsITimer *timer) michael@0: { michael@0: LOG(("WebSocketChannel::Notify() %p [%p]\n", this, timer)); michael@0: michael@0: if (timer == mCloseTimer) { michael@0: NS_ABORT_IF_FALSE(mClientClosed, "Close Timeout without local close"); michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, michael@0: "not socket thread"); michael@0: michael@0: mCloseTimer = nullptr; michael@0: if (mStopped || mServerClosed) /* no longer relevant */ michael@0: return NS_OK; michael@0: michael@0: LOG(("WebSocketChannel:: Expecting Server Close - Timed Out\n")); michael@0: AbortSession(NS_ERROR_NET_TIMEOUT); michael@0: } else if (timer == mOpenTimer) { michael@0: NS_ABORT_IF_FALSE(!mGotUpgradeOK, michael@0: "Open Timer after open complete"); michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: mOpenTimer = nullptr; michael@0: LOG(("WebSocketChannel:: Connection Timed Out\n")); michael@0: if (mStopped || mServerClosed) /* no longer relevant */ michael@0: return NS_OK; michael@0: michael@0: AbortSession(NS_ERROR_NET_TIMEOUT); michael@0: } else if (timer == mReconnectDelayTimer) { michael@0: NS_ABORT_IF_FALSE(mConnecting == CONNECTING_DELAYED, michael@0: "woke up from delay w/o being delayed?"); michael@0: michael@0: mReconnectDelayTimer = nullptr; michael@0: LOG(("WebSocketChannel: connecting [this=%p] after reconnect delay", this)); michael@0: BeginOpen(); michael@0: } else if (timer == mPingTimer) { michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, michael@0: "not socket thread"); michael@0: michael@0: if (mClientClosed || mServerClosed || mRequestedClose) { michael@0: // no point in worrying about ping now michael@0: mPingTimer = nullptr; michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (!mPingOutstanding) { michael@0: LOG(("nsWebSocketChannel:: Generating Ping\n")); michael@0: mPingOutstanding = 1; michael@0: GeneratePing(); michael@0: mPingTimer->InitWithCallback(this, mPingResponseTimeout, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: } else { michael@0: LOG(("nsWebSocketChannel:: Timed out Ping\n")); michael@0: mPingTimer = nullptr; michael@0: AbortSession(NS_ERROR_NET_TIMEOUT); michael@0: } michael@0: } else if (timer == mLingeringCloseTimer) { michael@0: LOG(("WebSocketChannel:: Lingering Close Timer")); michael@0: CleanupConnection(); michael@0: } else { michael@0: NS_ABORT_IF_FALSE(0, "Unknown Timer"); michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsIWebSocketChannel michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::GetSecurityInfo(nsISupports **aSecurityInfo) michael@0: { michael@0: LOG(("WebSocketChannel::GetSecurityInfo() %p\n", this)); michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: if (mTransport) { michael@0: if (NS_FAILED(mTransport->GetSecurityInfo(aSecurityInfo))) michael@0: *aSecurityInfo = nullptr; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::AsyncOpen(nsIURI *aURI, michael@0: const nsACString &aOrigin, michael@0: nsIWebSocketListener *aListener, michael@0: nsISupports *aContext) michael@0: { michael@0: LOG(("WebSocketChannel::AsyncOpen() %p\n", this)); michael@0: michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: if (!aURI || !aListener) { michael@0: LOG(("WebSocketChannel::AsyncOpen() Uri or Listener null")); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: if (mListener || mWasOpened) michael@0: return NS_ERROR_ALREADY_OPENED; michael@0: michael@0: nsresult rv; michael@0: michael@0: // Ensure target thread is set. michael@0: if (!mTargetThread) { michael@0: mTargetThread = do_GetMainThread(); michael@0: } michael@0: michael@0: mSocketThread = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: NS_WARNING("unable to continue without socket transport service"); michael@0: return rv; michael@0: } michael@0: michael@0: mRandomGenerator = michael@0: do_GetService("@mozilla.org/security/random-generator;1", &rv); michael@0: if (NS_FAILED(rv)) { michael@0: NS_WARNING("unable to continue without random number generator"); michael@0: return rv; michael@0: } michael@0: michael@0: nsCOMPtr prefService; michael@0: prefService = do_GetService(NS_PREFSERVICE_CONTRACTID); michael@0: michael@0: if (prefService) { michael@0: int32_t intpref; michael@0: bool boolpref; michael@0: rv = prefService->GetIntPref("network.websocket.max-message-size", michael@0: &intpref); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mMaxMessageSize = clamped(intpref, 1024, INT32_MAX); michael@0: } michael@0: rv = prefService->GetIntPref("network.websocket.timeout.close", &intpref); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mCloseTimeout = clamped(intpref, 1, 1800) * 1000; michael@0: } michael@0: rv = prefService->GetIntPref("network.websocket.timeout.open", &intpref); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mOpenTimeout = clamped(intpref, 1, 1800) * 1000; michael@0: } michael@0: rv = prefService->GetIntPref("network.websocket.timeout.ping.request", michael@0: &intpref); michael@0: if (NS_SUCCEEDED(rv) && !mClientSetPingInterval) { michael@0: mPingInterval = clamped(intpref, 0, 86400) * 1000; michael@0: } michael@0: rv = prefService->GetIntPref("network.websocket.timeout.ping.response", michael@0: &intpref); michael@0: if (NS_SUCCEEDED(rv) && !mClientSetPingTimeout) { michael@0: mPingResponseTimeout = clamped(intpref, 1, 3600) * 1000; michael@0: } michael@0: rv = prefService->GetBoolPref("network.websocket.extensions.stream-deflate", michael@0: &boolpref); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mAllowCompression = boolpref ? 1 : 0; michael@0: } michael@0: rv = prefService->GetBoolPref("network.websocket.auto-follow-http-redirects", michael@0: &boolpref); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mAutoFollowRedirects = boolpref ? 1 : 0; michael@0: } michael@0: rv = prefService->GetIntPref michael@0: ("network.websocket.max-connections", &intpref); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: mMaxConcurrentConnections = clamped(intpref, 1, 0xffff); michael@0: } michael@0: } michael@0: michael@0: int32_t sessionCount = -1; michael@0: nsWSAdmissionManager::GetSessionCount(sessionCount); michael@0: if (sessionCount >= 0) { michael@0: LOG(("WebSocketChannel::AsyncOpen %p sessionCount=%d max=%d\n", this, michael@0: sessionCount, mMaxConcurrentConnections)); michael@0: } michael@0: michael@0: if (sessionCount >= mMaxConcurrentConnections) { michael@0: LOG(("WebSocketChannel: max concurrency %d exceeded (%d)", michael@0: mMaxConcurrentConnections, michael@0: sessionCount)); michael@0: michael@0: // WebSocket connections are expected to be long lived, so return michael@0: // an error here instead of queueing michael@0: return NS_ERROR_SOCKET_CREATE_FAILED; michael@0: } michael@0: michael@0: mOriginalURI = aURI; michael@0: mURI = mOriginalURI; michael@0: mURI->GetHostPort(mHost); michael@0: mOrigin = aOrigin; michael@0: michael@0: nsCOMPtr localURI; michael@0: nsCOMPtr localChannel; michael@0: michael@0: mURI->Clone(getter_AddRefs(localURI)); michael@0: if (mEncrypted) michael@0: rv = localURI->SetScheme(NS_LITERAL_CSTRING("https")); michael@0: else michael@0: rv = localURI->SetScheme(NS_LITERAL_CSTRING("http")); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: nsCOMPtr ioService; michael@0: ioService = do_GetService(NS_IOSERVICE_CONTRACTID, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: NS_WARNING("unable to continue without io service"); michael@0: return rv; michael@0: } michael@0: michael@0: nsCOMPtr io2 = do_QueryInterface(ioService, &rv); michael@0: if (NS_FAILED(rv)) { michael@0: NS_WARNING("WebSocketChannel: unable to continue without ioservice2"); michael@0: return rv; michael@0: } michael@0: michael@0: rv = io2->NewChannelFromURIWithProxyFlags( michael@0: localURI, michael@0: mURI, michael@0: nsIProtocolProxyService::RESOLVE_PREFER_HTTPS_PROXY | michael@0: nsIProtocolProxyService::RESOLVE_ALWAYS_TUNNEL, michael@0: getter_AddRefs(localChannel)); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: // Pass most GetInterface() requests through to our instantiator, but handle michael@0: // nsIChannelEventSink in this object in order to deal with redirects michael@0: localChannel->SetNotificationCallbacks(this); michael@0: michael@0: mChannel = do_QueryInterface(localChannel, &rv); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: mHttpChannel = do_QueryInterface(localChannel, &rv); michael@0: NS_ENSURE_SUCCESS(rv, rv); michael@0: michael@0: rv = SetupRequest(); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: mPrivateBrowsing = NS_UsePrivateBrowsing(localChannel); michael@0: michael@0: if (mConnectionLogService && !mPrivateBrowsing) { michael@0: mConnectionLogService->AddHost(mHost, mSerial, michael@0: BaseWebSocketChannel::mEncrypted); michael@0: } michael@0: michael@0: rv = ApplyForAdmission(); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: // Only set these if the open was successful: michael@0: // michael@0: mWasOpened = 1; michael@0: mListener = aListener; michael@0: mContext = aContext; michael@0: IncrementSessionCount(); michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::Close(uint16_t code, const nsACString & reason) michael@0: { michael@0: LOG(("WebSocketChannel::Close() %p\n", this)); michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: // save the networkstats (bug 855949) michael@0: SaveNetworkStats(true); michael@0: michael@0: if (mRequestedClose) { michael@0: return NS_OK; michael@0: } michael@0: michael@0: // The API requires the UTF-8 string to be 123 or less bytes michael@0: if (reason.Length() > 123) michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: michael@0: mRequestedClose = 1; michael@0: mScriptCloseReason = reason; michael@0: mScriptCloseCode = code; michael@0: michael@0: if (!mTransport) { michael@0: nsresult rv; michael@0: if (code == CLOSE_GOING_AWAY) { michael@0: // Not an error: for example, tab has closed or navigated away michael@0: LOG(("WebSocketChannel::Close() GOING_AWAY without transport.")); michael@0: rv = NS_OK; michael@0: } else { michael@0: LOG(("WebSocketChannel::Close() without transport - error.")); michael@0: rv = NS_ERROR_NOT_CONNECTED; michael@0: } michael@0: StopSession(rv); michael@0: return rv; michael@0: } michael@0: michael@0: return mSocketThread->Dispatch( michael@0: new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nullptr)), michael@0: nsIEventTarget::DISPATCH_NORMAL); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::SendMsg(const nsACString &aMsg) michael@0: { michael@0: LOG(("WebSocketChannel::SendMsg() %p\n", this)); michael@0: michael@0: return SendMsgCommon(&aMsg, false, aMsg.Length()); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::SendBinaryMsg(const nsACString &aMsg) michael@0: { michael@0: LOG(("WebSocketChannel::SendBinaryMsg() %p len=%d\n", this, aMsg.Length())); michael@0: return SendMsgCommon(&aMsg, true, aMsg.Length()); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::SendBinaryStream(nsIInputStream *aStream, uint32_t aLength) michael@0: { michael@0: LOG(("WebSocketChannel::SendBinaryStream() %p\n", this)); michael@0: michael@0: return SendMsgCommon(nullptr, true, aLength, aStream); michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary, michael@0: uint32_t aLength, nsIInputStream *aStream) michael@0: { michael@0: NS_ABORT_IF_FALSE(NS_GetCurrentThread() == mTargetThread, "not target thread"); michael@0: michael@0: if (mRequestedClose) { michael@0: LOG(("WebSocketChannel:: Error: send when closed\n")); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: if (mStopped) { michael@0: LOG(("WebSocketChannel:: Error: send when stopped\n")); michael@0: return NS_ERROR_NOT_CONNECTED; michael@0: } michael@0: michael@0: NS_ABORT_IF_FALSE(mMaxMessageSize >= 0, "max message size negative"); michael@0: if (aLength > static_cast(mMaxMessageSize)) { michael@0: LOG(("WebSocketChannel:: Error: message too big\n")); michael@0: return NS_ERROR_FILE_TOO_BIG; michael@0: } michael@0: michael@0: if (mConnectionLogService && !mPrivateBrowsing) { michael@0: mConnectionLogService->NewMsgSent(mHost, mSerial, aLength); michael@0: LOG(("Added new msg sent for %s", mHost.get())); michael@0: } michael@0: michael@0: return mSocketThread->Dispatch( michael@0: aStream ? new OutboundEnqueuer(this, new OutboundMessage(aStream, aLength)) michael@0: : new OutboundEnqueuer(this, michael@0: new OutboundMessage(aIsBinary ? kMsgTypeBinaryString michael@0: : kMsgTypeString, michael@0: new nsCString(*aMsg))), michael@0: nsIEventTarget::DISPATCH_NORMAL); michael@0: } michael@0: michael@0: // nsIHttpUpgradeListener michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport, michael@0: nsIAsyncInputStream *aSocketIn, michael@0: nsIAsyncOutputStream *aSocketOut) michael@0: { michael@0: if (!NS_IsMainThread()) { michael@0: return NS_DispatchToMainThread(new CallOnTransportAvailable(this, michael@0: aTransport, michael@0: aSocketIn, michael@0: aSocketOut)); michael@0: } michael@0: michael@0: LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n", michael@0: this, aTransport, aSocketIn, aSocketOut, mGotUpgradeOK)); michael@0: michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: NS_ABORT_IF_FALSE(!mRecvdHttpUpgradeTransport, "OTA duplicated"); michael@0: NS_ABORT_IF_FALSE(aSocketIn, "OTA with invalid socketIn"); michael@0: michael@0: mTransport = aTransport; michael@0: mSocketIn = aSocketIn; michael@0: mSocketOut = aSocketOut; michael@0: michael@0: nsresult rv; michael@0: rv = mTransport->SetEventSink(nullptr, nullptr); michael@0: if (NS_FAILED(rv)) return rv; michael@0: rv = mTransport->SetSecurityCallbacks(this); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: mRecvdHttpUpgradeTransport = 1; michael@0: if (mGotUpgradeOK) michael@0: return StartWebsocketData(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsIRequestObserver (from nsIStreamListener) michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnStartRequest(nsIRequest *aRequest, michael@0: nsISupports *aContext) michael@0: { michael@0: LOG(("WebSocketChannel::OnStartRequest(): %p [%p %p] recvdhttpupgrade=%d\n", michael@0: this, aRequest, aContext, mRecvdHttpUpgradeTransport)); michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: NS_ABORT_IF_FALSE(!mGotUpgradeOK, "OTA duplicated"); michael@0: michael@0: if (mOpenTimer) { michael@0: mOpenTimer->Cancel(); michael@0: mOpenTimer = nullptr; michael@0: } michael@0: michael@0: if (mStopped) { michael@0: LOG(("WebSocketChannel::OnStartRequest: Channel Already Done\n")); michael@0: AbortSession(NS_ERROR_CONNECTION_REFUSED); michael@0: return NS_ERROR_CONNECTION_REFUSED; michael@0: } michael@0: michael@0: nsresult rv; michael@0: uint32_t status; michael@0: char *val, *token; michael@0: michael@0: rv = mHttpChannel->GetResponseStatus(&status); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::OnStartRequest: No HTTP Response\n")); michael@0: AbortSession(NS_ERROR_CONNECTION_REFUSED); michael@0: return NS_ERROR_CONNECTION_REFUSED; michael@0: } michael@0: michael@0: LOG(("WebSocketChannel::OnStartRequest: HTTP status %d\n", status)); michael@0: if (status != 101) { michael@0: AbortSession(NS_ERROR_CONNECTION_REFUSED); michael@0: return NS_ERROR_CONNECTION_REFUSED; michael@0: } michael@0: michael@0: nsAutoCString respUpgrade; michael@0: rv = mHttpChannel->GetResponseHeader( michael@0: NS_LITERAL_CSTRING("Upgrade"), respUpgrade); michael@0: michael@0: if (NS_SUCCEEDED(rv)) { michael@0: rv = NS_ERROR_ILLEGAL_VALUE; michael@0: if (!respUpgrade.IsEmpty()) { michael@0: val = respUpgrade.BeginWriting(); michael@0: while ((token = nsCRT::strtok(val, ", \t", &val))) { michael@0: if (PL_strcasecmp(token, "Websocket") == 0) { michael@0: rv = NS_OK; michael@0: break; michael@0: } michael@0: } michael@0: } michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::OnStartRequest: " michael@0: "HTTP response header Upgrade: websocket not found\n")); michael@0: AbortSession(NS_ERROR_ILLEGAL_VALUE); michael@0: return rv; michael@0: } michael@0: michael@0: nsAutoCString respConnection; michael@0: rv = mHttpChannel->GetResponseHeader( michael@0: NS_LITERAL_CSTRING("Connection"), respConnection); michael@0: michael@0: if (NS_SUCCEEDED(rv)) { michael@0: rv = NS_ERROR_ILLEGAL_VALUE; michael@0: if (!respConnection.IsEmpty()) { michael@0: val = respConnection.BeginWriting(); michael@0: while ((token = nsCRT::strtok(val, ", \t", &val))) { michael@0: if (PL_strcasecmp(token, "Upgrade") == 0) { michael@0: rv = NS_OK; michael@0: break; michael@0: } michael@0: } michael@0: } michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("WebSocketChannel::OnStartRequest: " michael@0: "HTTP response header 'Connection: Upgrade' not found\n")); michael@0: AbortSession(NS_ERROR_ILLEGAL_VALUE); michael@0: return rv; michael@0: } michael@0: michael@0: nsAutoCString respAccept; michael@0: rv = mHttpChannel->GetResponseHeader( michael@0: NS_LITERAL_CSTRING("Sec-WebSocket-Accept"), michael@0: respAccept); michael@0: michael@0: if (NS_FAILED(rv) || michael@0: respAccept.IsEmpty() || !respAccept.Equals(mHashedSecret)) { michael@0: LOG(("WebSocketChannel::OnStartRequest: " michael@0: "HTTP response header Sec-WebSocket-Accept check failed\n")); michael@0: LOG(("WebSocketChannel::OnStartRequest: Expected %s received %s\n", michael@0: mHashedSecret.get(), respAccept.get())); michael@0: AbortSession(NS_ERROR_ILLEGAL_VALUE); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: // If we sent a sub protocol header, verify the response matches michael@0: // If it does not, set mProtocol to "" so the protocol attribute michael@0: // of the WebSocket JS object reflects that michael@0: if (!mProtocol.IsEmpty()) { michael@0: nsAutoCString respProtocol; michael@0: rv = mHttpChannel->GetResponseHeader( michael@0: NS_LITERAL_CSTRING("Sec-WebSocket-Protocol"), michael@0: respProtocol); michael@0: if (NS_SUCCEEDED(rv)) { michael@0: rv = NS_ERROR_ILLEGAL_VALUE; michael@0: val = mProtocol.BeginWriting(); michael@0: while ((token = nsCRT::strtok(val, ", \t", &val))) { michael@0: if (PL_strcasecmp(token, respProtocol.get()) == 0) { michael@0: rv = NS_OK; michael@0: break; michael@0: } michael@0: } michael@0: michael@0: if (NS_SUCCEEDED(rv)) { michael@0: LOG(("WebsocketChannel::OnStartRequest: subprotocol %s confirmed", michael@0: respProtocol.get())); michael@0: mProtocol = respProtocol; michael@0: } else { michael@0: LOG(("WebsocketChannel::OnStartRequest: " michael@0: "subprotocol [%s] not found - %s returned", michael@0: mProtocol.get(), respProtocol.get())); michael@0: mProtocol.Truncate(); michael@0: } michael@0: } else { michael@0: LOG(("WebsocketChannel::OnStartRequest " michael@0: "subprotocol [%s] not found - none returned", michael@0: mProtocol.get())); michael@0: mProtocol.Truncate(); michael@0: } michael@0: } michael@0: michael@0: rv = HandleExtensions(); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: mGotUpgradeOK = 1; michael@0: if (mRecvdHttpUpgradeTransport) michael@0: return StartWebsocketData(); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnStopRequest(nsIRequest *aRequest, michael@0: nsISupports *aContext, michael@0: nsresult aStatusCode) michael@0: { michael@0: LOG(("WebSocketChannel::OnStopRequest() %p [%p %p %x]\n", michael@0: this, aRequest, aContext, aStatusCode)); michael@0: NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); michael@0: michael@0: ReportConnectionTelemetry(); michael@0: michael@0: // This is the end of the HTTP upgrade transaction, the michael@0: // upgraded streams live on michael@0: michael@0: mChannel = nullptr; michael@0: mHttpChannel = nullptr; michael@0: mLoadGroup = nullptr; michael@0: mCallbacks = nullptr; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsIInputStreamCallback michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnInputStreamReady(nsIAsyncInputStream *aStream) michael@0: { michael@0: LOG(("WebSocketChannel::OnInputStreamReady() %p\n", this)); michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); michael@0: michael@0: if (!mSocketIn) // did we we clean up the socket after scheduling InputReady? michael@0: return NS_OK; michael@0: michael@0: nsRefPtr deleteProtector1(mInflateReader); michael@0: nsRefPtr deleteProtector2(mInflateStream); michael@0: michael@0: // this is after the http upgrade - so we are speaking websockets michael@0: char buffer[2048]; michael@0: uint32_t count; michael@0: nsresult rv; michael@0: michael@0: do { michael@0: rv = mSocketIn->Read((char *)buffer, 2048, &count); michael@0: LOG(("WebSocketChannel::OnInputStreamReady: read %u rv %x\n", count, rv)); michael@0: michael@0: // accumulate received bytes michael@0: CountRecvBytes(count); michael@0: michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) { michael@0: mSocketIn->AsyncWait(this, 0, 0, mSocketThread); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: mTCPClosed = true; michael@0: AbortSession(rv); michael@0: return rv; michael@0: } michael@0: michael@0: if (count == 0) { michael@0: mTCPClosed = true; michael@0: AbortSession(NS_BASE_STREAM_CLOSED); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (mStopped) { michael@0: continue; michael@0: } michael@0: michael@0: if (mInflateReader) { michael@0: mInflateStream->ShareData(buffer, count); michael@0: rv = mInflateReader->OnDataAvailable(nullptr, mSocketIn, mInflateStream, michael@0: 0, count); michael@0: } else { michael@0: rv = ProcessInput((uint8_t *)buffer, count); michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: AbortSession(rv); michael@0: return rv; michael@0: } michael@0: } while (NS_SUCCEEDED(rv) && mSocketIn); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: michael@0: // nsIOutputStreamCallback michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnOutputStreamReady(nsIAsyncOutputStream *aStream) michael@0: { michael@0: LOG(("WebSocketChannel::OnOutputStreamReady() %p\n", this)); michael@0: NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); michael@0: nsresult rv; michael@0: michael@0: if (!mCurrentOut) michael@0: PrimeNewOutgoingMessage(); michael@0: michael@0: while (mCurrentOut && mSocketOut) { michael@0: const char *sndBuf; michael@0: uint32_t toSend; michael@0: uint32_t amtSent; michael@0: michael@0: if (mHdrOut) { michael@0: sndBuf = (const char *)mHdrOut; michael@0: toSend = mHdrOutToSend; michael@0: LOG(("WebSocketChannel::OnOutputStreamReady: " michael@0: "Try to send %u of hdr/copybreak\n", toSend)); michael@0: } else { michael@0: sndBuf = (char *) mCurrentOut->BeginReading() + mCurrentOutSent; michael@0: toSend = mCurrentOut->Length() - mCurrentOutSent; michael@0: if (toSend > 0) { michael@0: LOG(("WebSocketChannel::OnOutputStreamReady: " michael@0: "Try to send %u of data\n", toSend)); michael@0: } michael@0: } michael@0: michael@0: if (toSend == 0) { michael@0: amtSent = 0; michael@0: } else { michael@0: rv = mSocketOut->Write(sndBuf, toSend, &amtSent); michael@0: LOG(("WebSocketChannel::OnOutputStreamReady: write %u rv %x\n", michael@0: amtSent, rv)); michael@0: michael@0: // accumulate sent bytes michael@0: CountSentBytes(amtSent); michael@0: michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) { michael@0: mSocketOut->AsyncWait(this, 0, 0, mSocketThread); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: AbortSession(rv); michael@0: return NS_OK; michael@0: } michael@0: } michael@0: michael@0: if (mHdrOut) { michael@0: if (amtSent == toSend) { michael@0: mHdrOut = nullptr; michael@0: mHdrOutToSend = 0; michael@0: } else { michael@0: mHdrOut += amtSent; michael@0: mHdrOutToSend -= amtSent; michael@0: } michael@0: } else { michael@0: if (amtSent == toSend) { michael@0: if (!mStopped) { michael@0: mTargetThread->Dispatch(new CallAcknowledge(this, michael@0: mCurrentOut->Length()), michael@0: NS_DISPATCH_NORMAL); michael@0: } michael@0: DeleteCurrentOutGoingMessage(); michael@0: PrimeNewOutgoingMessage(); michael@0: } else { michael@0: mCurrentOutSent += amtSent; michael@0: } michael@0: } michael@0: } michael@0: michael@0: if (mReleaseOnTransmit) michael@0: ReleaseSession(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // nsIStreamListener michael@0: michael@0: NS_IMETHODIMP michael@0: WebSocketChannel::OnDataAvailable(nsIRequest *aRequest, michael@0: nsISupports *aContext, michael@0: nsIInputStream *aInputStream, michael@0: uint64_t aOffset, michael@0: uint32_t aCount) michael@0: { michael@0: LOG(("WebSocketChannel::OnDataAvailable() %p [%p %p %p %llu %u]\n", michael@0: this, aRequest, aContext, aInputStream, aOffset, aCount)); michael@0: michael@0: if (aContext == mSocketIn) { michael@0: // This is the deflate decoder michael@0: michael@0: LOG(("WebSocketChannel::OnDataAvailable: Deflate Data %u\n", michael@0: aCount)); michael@0: michael@0: uint8_t buffer[2048]; michael@0: uint32_t maxRead; michael@0: uint32_t count; michael@0: nsresult rv = NS_OK; // aCount always > 0, so this just avoids warning michael@0: michael@0: while (aCount > 0) { michael@0: if (mStopped) michael@0: return NS_BASE_STREAM_CLOSED; michael@0: michael@0: maxRead = std::min(2048U, aCount); michael@0: rv = aInputStream->Read((char *)buffer, maxRead, &count); michael@0: LOG(("WebSocketChannel::OnDataAvailable: InflateRead read %u rv %x\n", michael@0: count, rv)); michael@0: if (NS_FAILED(rv) || count == 0) { michael@0: AbortSession(NS_ERROR_UNEXPECTED); michael@0: break; michael@0: } michael@0: michael@0: aCount -= count; michael@0: rv = ProcessInput(buffer, count); michael@0: if (NS_FAILED(rv)) { michael@0: AbortSession(rv); michael@0: break; michael@0: } michael@0: } michael@0: return rv; michael@0: } michael@0: michael@0: if (aContext == mSocketOut) { michael@0: // This is the deflate encoder michael@0: michael@0: uint32_t maxRead; michael@0: uint32_t count; michael@0: nsresult rv; michael@0: michael@0: while (aCount > 0) { michael@0: if (mStopped) michael@0: return NS_BASE_STREAM_CLOSED; michael@0: michael@0: maxRead = std::min(2048U, aCount); michael@0: EnsureHdrOut(mHdrOutToSend + aCount); michael@0: rv = aInputStream->Read((char *)mHdrOut + mHdrOutToSend, maxRead, &count); michael@0: LOG(("WebSocketChannel::OnDataAvailable: DeflateWrite read %u rv %x\n", michael@0: count, rv)); michael@0: if (NS_FAILED(rv) || count == 0) { michael@0: AbortSession(rv); michael@0: break; michael@0: } michael@0: michael@0: mHdrOutToSend += count; michael@0: aCount -= count; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: michael@0: // Otherwise, this is the HTTP OnDataAvailable Method, which means michael@0: // this is http data in response to the upgrade request and michael@0: // there should be no http response body if the upgrade succeeded michael@0: michael@0: // This generally should be caught by a non 101 response code in michael@0: // OnStartRequest().. so we can ignore the data here michael@0: michael@0: LOG(("WebSocketChannel::OnDataAvailable: HTTP data unexpected len>=%u\n", michael@0: aCount)); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: WebSocketChannel::SaveNetworkStats(bool enforce) michael@0: { michael@0: #ifdef MOZ_WIDGET_GONK michael@0: // Check if the active network and app id are valid. michael@0: if(!mActiveNetwork || mAppId == NECKO_NO_APP_ID) { michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (mCountRecv <= 0 && mCountSent <= 0) { michael@0: // There is no traffic, no need to save. michael@0: return NS_OK; michael@0: } michael@0: michael@0: // If |enforce| is false, the traffic amount is saved michael@0: // only when the total amount exceeds the predefined michael@0: // threshold. michael@0: uint64_t totalBytes = mCountRecv + mCountSent; michael@0: if (!enforce && totalBytes < NETWORK_STATS_THRESHOLD) { michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Create the event to save the network statistics. michael@0: // the event is then dispathed to the main thread. michael@0: nsRefPtr event = michael@0: new SaveNetworkStatsEvent(mAppId, mActiveNetwork, michael@0: mCountRecv, mCountSent, false); michael@0: NS_DispatchToMainThread(event); michael@0: michael@0: // Reset the counters after saving. michael@0: mCountSent = 0; michael@0: mCountRecv = 0; michael@0: michael@0: return NS_OK; michael@0: #else michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: #endif michael@0: } michael@0: michael@0: } // namespace mozilla::net michael@0: } // namespace mozilla