michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* vim: set sw=2 ts=8 et tw=80 : */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: // HttpLog.h should generally be included first michael@0: #include "HttpLog.h" michael@0: michael@0: // Log on level :5, instead of default :4. michael@0: #undef LOG michael@0: #define LOG(args) LOG5(args) michael@0: #undef LOG_ENABLED michael@0: #define LOG_ENABLED() LOG5_ENABLED() michael@0: michael@0: #include "mozilla/Endian.h" michael@0: #include "mozilla/Telemetry.h" michael@0: #include "nsHttp.h" michael@0: #include "nsHttpHandler.h" michael@0: #include "nsILoadGroup.h" michael@0: #include "prprf.h" michael@0: #include "SpdyPush3.h" michael@0: #include "SpdySession3.h" michael@0: #include "SpdyStream3.h" michael@0: #include "PSpdyPush.h" michael@0: #include "SpdyZlibReporter.h" michael@0: michael@0: #include michael@0: michael@0: #ifdef DEBUG michael@0: // defined by the socket transport service while active michael@0: extern PRThread *gSocketThread; michael@0: #endif michael@0: michael@0: namespace mozilla { michael@0: namespace net { michael@0: michael@0: // SpdySession3 has multiple inheritance of things that implement michael@0: // nsISupports, so this magic is taken from nsHttpPipeline that michael@0: // implements some of the same abstract classes. michael@0: NS_IMPL_ADDREF(SpdySession3) michael@0: NS_IMPL_RELEASE(SpdySession3) michael@0: NS_INTERFACE_MAP_BEGIN(SpdySession3) michael@0: NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsAHttpConnection) michael@0: NS_INTERFACE_MAP_END michael@0: michael@0: SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction, michael@0: nsISocketTransport *aSocketTransport, michael@0: int32_t firstPriority) michael@0: : mSocketTransport(aSocketTransport), michael@0: mSegmentReader(nullptr), michael@0: mSegmentWriter(nullptr), michael@0: mNextStreamID(1), michael@0: mConcurrentHighWater(0), michael@0: mDownstreamState(BUFFERING_FRAME_HEADER), michael@0: mInputFrameBufferSize(kDefaultBufferSize), michael@0: mInputFrameBufferUsed(0), michael@0: mInputFrameDataLast(false), michael@0: mInputFrameDataStream(nullptr), michael@0: mNeedsCleanup(nullptr), michael@0: mShouldGoAway(false), michael@0: mClosed(false), michael@0: mCleanShutdown(false), michael@0: mDataPending(false), michael@0: mGoAwayID(0), michael@0: mMaxConcurrent(kDefaultMaxConcurrent), michael@0: mConcurrent(0), michael@0: mServerPushedResources(0), michael@0: mServerInitialWindow(kDefaultServerRwin), michael@0: mOutputQueueSize(kDefaultQueueSize), michael@0: mOutputQueueUsed(0), michael@0: mOutputQueueSent(0), michael@0: mLastReadEpoch(PR_IntervalNow()), michael@0: mPingSentEpoch(0), michael@0: mNextPingID(1) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: static uint64_t sSerial; michael@0: mSerial = ++sSerial; michael@0: michael@0: LOG3(("SpdySession3::SpdySession3 %p transaction 1 = %p serial=0x%X\n", michael@0: this, aHttpTransaction, mSerial)); michael@0: michael@0: mConnection = aHttpTransaction->Connection(); michael@0: mInputFrameBuffer = new char[mInputFrameBufferSize]; michael@0: mOutputQueueBuffer = new char[mOutputQueueSize]; michael@0: zlibInit(); michael@0: michael@0: mPushAllowance = gHttpHandler->SpdyPushAllowance(); michael@0: mSendingChunkSize = gHttpHandler->SpdySendingChunkSize(); michael@0: GenerateSettings(); michael@0: michael@0: if (!aHttpTransaction->IsNullTransaction()) michael@0: AddStream(aHttpTransaction, firstPriority); michael@0: mLastDataReadEpoch = mLastReadEpoch; michael@0: michael@0: mPingThreshold = gHttpHandler->SpdyPingThreshold(); michael@0: } michael@0: michael@0: PLDHashOperator michael@0: SpdySession3::ShutdownEnumerator(nsAHttpTransaction *key, michael@0: nsAutoPtr &stream, michael@0: void *closure) michael@0: { michael@0: SpdySession3 *self = static_cast(closure); michael@0: michael@0: // On a clean server hangup the server sets the GoAwayID to be the ID of michael@0: // the last transaction it processed. If the ID of stream in the michael@0: // local stream is greater than that it can safely be restarted because the michael@0: // server guarantees it was not partially processed. Streams that have not michael@0: // registered an ID haven't actually been sent yet so they can always be michael@0: // restarted. michael@0: if (self->mCleanShutdown && michael@0: (stream->StreamID() > self->mGoAwayID || !stream->HasRegisteredID())) michael@0: self->CloseStream(stream, NS_ERROR_NET_RESET); // can be restarted michael@0: else michael@0: self->CloseStream(stream, NS_ERROR_ABORT); michael@0: michael@0: return PL_DHASH_NEXT; michael@0: } michael@0: michael@0: PLDHashOperator michael@0: SpdySession3::GoAwayEnumerator(nsAHttpTransaction *key, michael@0: nsAutoPtr &stream, michael@0: void *closure) michael@0: { michael@0: SpdySession3 *self = static_cast(closure); michael@0: michael@0: // these streams were not processed by the server and can be restarted. michael@0: // Do that after the enumerator completes to avoid the risk of michael@0: // a restart event re-entrantly modifying this hash. Be sure not to restart michael@0: // a pushed (even numbered) stream michael@0: if ((stream->StreamID() > self->mGoAwayID && (stream->StreamID() & 1)) || michael@0: !stream->HasRegisteredID()) { michael@0: self->mGoAwayStreamsToRestart.Push(stream); michael@0: } michael@0: michael@0: return PL_DHASH_NEXT; michael@0: } michael@0: michael@0: SpdySession3::~SpdySession3() michael@0: { michael@0: LOG3(("SpdySession3::~SpdySession3 %p mDownstreamState=%X", michael@0: this, mDownstreamState)); michael@0: michael@0: inflateEnd(&mDownstreamZlib); michael@0: deflateEnd(&mUpstreamZlib); michael@0: michael@0: mStreamTransactionHash.Enumerate(ShutdownEnumerator, this); michael@0: Telemetry::Accumulate(Telemetry::SPDY_PARALLEL_STREAMS, mConcurrentHighWater); michael@0: Telemetry::Accumulate(Telemetry::SPDY_REQUEST_PER_CONN, (mNextStreamID - 1) / 2); michael@0: Telemetry::Accumulate(Telemetry::SPDY_SERVER_INITIATED_STREAMS, michael@0: mServerPushedResources); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::LogIO(SpdySession3 *self, SpdyStream3 *stream, const char *label, michael@0: const char *data, uint32_t datalen) michael@0: { michael@0: if (!LOG4_ENABLED()) michael@0: return; michael@0: michael@0: LOG4(("SpdySession3::LogIO %p stream=%p id=0x%X [%s]", michael@0: self, stream, stream ? stream->StreamID() : 0, label)); michael@0: michael@0: // Max line is (16 * 3) + 10(prefix) + newline + null michael@0: char linebuf[128]; michael@0: uint32_t index; michael@0: char *line = linebuf; michael@0: michael@0: linebuf[127] = 0; michael@0: michael@0: for (index = 0; index < datalen; ++index) { michael@0: if (!(index % 16)) { michael@0: if (index) { michael@0: *line = 0; michael@0: LOG4(("%s", linebuf)); michael@0: } michael@0: line = linebuf; michael@0: PR_snprintf(line, 128, "%08X: ", index); michael@0: line += 10; michael@0: } michael@0: PR_snprintf(line, 128 - (line - linebuf), "%02X ", michael@0: ((unsigned char *)data)[index]); michael@0: line += 3; michael@0: } michael@0: if (index) { michael@0: *line = 0; michael@0: LOG4(("%s", linebuf)); michael@0: } michael@0: } michael@0: michael@0: bool michael@0: SpdySession3::RoomForMoreConcurrent() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: return (mConcurrent < mMaxConcurrent); michael@0: } michael@0: michael@0: bool michael@0: SpdySession3::RoomForMoreStreams() michael@0: { michael@0: if (mNextStreamID + mStreamTransactionHash.Count() * 2 > kMaxStreamID) michael@0: return false; michael@0: michael@0: return !mShouldGoAway; michael@0: } michael@0: michael@0: PRIntervalTime michael@0: SpdySession3::IdleTime() michael@0: { michael@0: return PR_IntervalNow() - mLastDataReadEpoch; michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::ReadTimeoutTick(PRIntervalTime now) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(mNextPingID & 1, "Ping Counter Not Odd"); michael@0: michael@0: LOG(("SpdySession3::ReadTimeoutTick %p delta since last read %ds\n", michael@0: this, PR_IntervalToSeconds(now - mLastReadEpoch))); michael@0: michael@0: if (!mPingThreshold) michael@0: return UINT32_MAX; michael@0: michael@0: if ((now - mLastReadEpoch) < mPingThreshold) { michael@0: // recent activity means ping is not an issue michael@0: if (mPingSentEpoch) michael@0: mPingSentEpoch = 0; michael@0: michael@0: return PR_IntervalToSeconds(mPingThreshold) - michael@0: PR_IntervalToSeconds(now - mLastReadEpoch); michael@0: } michael@0: michael@0: if (mPingSentEpoch) { michael@0: LOG(("SpdySession3::ReadTimeoutTick %p handle outstanding ping\n")); michael@0: if ((now - mPingSentEpoch) >= gHttpHandler->SpdyPingTimeout()) { michael@0: LOG(("SpdySession3::ReadTimeoutTick %p Ping Timer Exhaustion\n", michael@0: this)); michael@0: mPingSentEpoch = 0; michael@0: Close(NS_ERROR_NET_TIMEOUT); michael@0: return UINT32_MAX; michael@0: } michael@0: return 1; // run the tick aggressively while ping is outstanding michael@0: } michael@0: michael@0: LOG(("SpdySession3::ReadTimeoutTick %p generating ping 0x%X\n", michael@0: this, mNextPingID)); michael@0: michael@0: if (mNextPingID == 0xffffffff) { michael@0: LOG(("SpdySession3::ReadTimeoutTick %p cannot form ping - ids exhausted\n", michael@0: this)); michael@0: return UINT32_MAX; michael@0: } michael@0: michael@0: mPingSentEpoch = PR_IntervalNow(); michael@0: if (!mPingSentEpoch) michael@0: mPingSentEpoch = 1; // avoid the 0 sentinel value michael@0: GeneratePing(mNextPingID); michael@0: mNextPingID += 2; michael@0: ResumeRecv(); // read the ping reply michael@0: michael@0: // Check for orphaned push streams. This looks expensive, but generally the michael@0: // list is empty. michael@0: SpdyPushedStream3 *deleteMe; michael@0: TimeStamp timestampNow; michael@0: do { michael@0: deleteMe = nullptr; michael@0: michael@0: for (uint32_t index = mPushedStreams.Length(); michael@0: index > 0 ; --index) { michael@0: SpdyPushedStream3 *pushedStream = mPushedStreams[index - 1]; michael@0: michael@0: if (timestampNow.IsNull()) michael@0: timestampNow = TimeStamp::Now(); // lazy initializer michael@0: michael@0: // if spdy finished, but not connected, and its been like that for too long.. michael@0: // cleanup the stream.. michael@0: if (pushedStream->IsOrphaned(timestampNow)) michael@0: { michael@0: LOG3(("SpdySession3 Timeout Pushed Stream %p 0x%X\n", michael@0: this, pushedStream->StreamID())); michael@0: deleteMe = pushedStream; michael@0: break; // don't CleanupStream() while iterating this vector michael@0: } michael@0: } michael@0: if (deleteMe) michael@0: CleanupStream(deleteMe, NS_ERROR_ABORT, RST_CANCEL); michael@0: michael@0: } while (deleteMe); michael@0: michael@0: if (mNextPingID == 0xffffffff) { michael@0: LOG(("SpdySession3::ReadTimeoutTick %p " michael@0: "ping ids exhausted marking goaway\n", this)); michael@0: mShouldGoAway = true; michael@0: } michael@0: return 1; // run the tick aggressively while ping is outstanding michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::RegisterStreamID(SpdyStream3 *stream, uint32_t aNewID) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: MOZ_ASSERT(mNextStreamID < 0xfffffff0, michael@0: "should have stopped admitting streams"); michael@0: michael@0: MOZ_ASSERT(!(aNewID & 1), michael@0: "0 for autoassign pull, otherwise explicit even push assignment"); michael@0: if (!aNewID) { michael@0: // auto generate a new pull stream ID michael@0: aNewID = mNextStreamID; michael@0: MOZ_ASSERT(aNewID & 1, "pull ID must be odd."); michael@0: mNextStreamID += 2; michael@0: } michael@0: michael@0: LOG3(("SpdySession3::RegisterStreamID session=%p stream=%p id=0x%X " michael@0: "concurrent=%d",this, stream, aNewID, mConcurrent)); michael@0: michael@0: // We've used up plenty of ID's on this session. Start michael@0: // moving to a new one before there is a crunch involving michael@0: // server push streams or concurrent non-registered submits michael@0: if (aNewID >= kMaxStreamID) michael@0: mShouldGoAway = true; michael@0: michael@0: // integrity check michael@0: if (mStreamIDHash.Get(aNewID)) { michael@0: LOG3((" New ID already present\n")); michael@0: MOZ_ASSERT(false, "New ID already present in mStreamIDHash"); michael@0: mShouldGoAway = true; michael@0: return kDeadStreamID; michael@0: } michael@0: michael@0: mStreamIDHash.Put(aNewID, stream); michael@0: return aNewID; michael@0: } michael@0: michael@0: bool michael@0: SpdySession3::AddStream(nsAHttpTransaction *aHttpTransaction, michael@0: int32_t aPriority) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: // integrity check michael@0: if (mStreamTransactionHash.Get(aHttpTransaction)) { michael@0: LOG3((" New transaction already present\n")); michael@0: MOZ_ASSERT(false, "AddStream duplicate transaction pointer"); michael@0: return false; michael@0: } michael@0: michael@0: aHttpTransaction->SetConnection(this); michael@0: SpdyStream3 *stream = new SpdyStream3(aHttpTransaction, this, aPriority); michael@0: michael@0: LOG3(("SpdySession3::AddStream session=%p stream=%p NextID=0x%X (tentative)", michael@0: this, stream, mNextStreamID)); michael@0: michael@0: mStreamTransactionHash.Put(aHttpTransaction, stream); michael@0: michael@0: if (RoomForMoreConcurrent()) { michael@0: LOG3(("SpdySession3::AddStream %p stream %p activated immediately.", michael@0: this, stream)); michael@0: ActivateStream(stream); michael@0: } michael@0: else { michael@0: LOG3(("SpdySession3::AddStream %p stream %p queued.", this, stream)); michael@0: mQueuedStreams.Push(stream); michael@0: } michael@0: michael@0: if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE)) { michael@0: LOG3(("SpdySession3::AddStream %p transaction %p forces keep-alive off.\n", michael@0: this, aHttpTransaction)); michael@0: DontReuse(); michael@0: } michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::ActivateStream(SpdyStream3 *stream) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), michael@0: "Do not activate pushed streams"); michael@0: michael@0: ++mConcurrent; michael@0: if (mConcurrent > mConcurrentHighWater) michael@0: mConcurrentHighWater = mConcurrent; michael@0: LOG3(("SpdySession3::AddStream %p activating stream %p Currently %d " michael@0: "streams in session, high water mark is %d", michael@0: this, stream, mConcurrent, mConcurrentHighWater)); michael@0: michael@0: mReadyForWrite.Push(stream); michael@0: SetWriteCallbacks(); michael@0: michael@0: // Kick off the SYN transmit without waiting for the poll loop michael@0: // This won't work for stream id=1 because there is no segment reader michael@0: // yet. michael@0: if (mSegmentReader) { michael@0: uint32_t countRead; michael@0: ReadSegments(nullptr, kDefaultBufferSize, &countRead); michael@0: } michael@0: } michael@0: michael@0: void michael@0: SpdySession3::ProcessPending() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: while (RoomForMoreConcurrent()) { michael@0: SpdyStream3 *stream = static_cast(mQueuedStreams.PopFront()); michael@0: if (!stream) michael@0: return; michael@0: LOG3(("SpdySession3::ProcessPending %p stream %p activated from queue.", michael@0: this, stream)); michael@0: ActivateStream(stream); michael@0: } michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::NetworkRead(nsAHttpSegmentWriter *writer, char *buf, michael@0: uint32_t count, uint32_t *countWritten) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: if (!count) { michael@0: *countWritten = 0; michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult rv = writer->OnWriteSegment(buf, count, countWritten); michael@0: if (NS_SUCCEEDED(rv) && *countWritten > 0) michael@0: mLastReadEpoch = PR_IntervalNow(); michael@0: return rv; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::SetWriteCallbacks() michael@0: { michael@0: if (mConnection && (GetWriteQueueSize() || mOutputQueueUsed)) michael@0: mConnection->ResumeSend(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::RealignOutputQueue() michael@0: { michael@0: mOutputQueueUsed -= mOutputQueueSent; michael@0: memmove(mOutputQueueBuffer.get(), michael@0: mOutputQueueBuffer.get() + mOutputQueueSent, michael@0: mOutputQueueUsed); michael@0: mOutputQueueSent = 0; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::FlushOutputQueue() michael@0: { michael@0: if (!mSegmentReader || !mOutputQueueUsed) michael@0: return; michael@0: michael@0: nsresult rv; michael@0: uint32_t countRead; michael@0: uint32_t avail = mOutputQueueUsed - mOutputQueueSent; michael@0: michael@0: rv = mSegmentReader-> michael@0: OnReadSegment(mOutputQueueBuffer.get() + mOutputQueueSent, avail, michael@0: &countRead); michael@0: LOG3(("SpdySession3::FlushOutputQueue %p sz=%d rv=%x actual=%d", michael@0: this, avail, rv, countRead)); michael@0: michael@0: // Dont worry about errors on write, we will pick this up as a read error too michael@0: if (NS_FAILED(rv)) michael@0: return; michael@0: michael@0: if (countRead == avail) { michael@0: mOutputQueueUsed = 0; michael@0: mOutputQueueSent = 0; michael@0: return; michael@0: } michael@0: michael@0: mOutputQueueSent += countRead; michael@0: michael@0: // If the output queue is close to filling up and we have sent out a good michael@0: // chunk of data from the beginning then realign it. michael@0: michael@0: if ((mOutputQueueSent >= kQueueMinimumCleanup) && michael@0: ((mOutputQueueSize - mOutputQueueUsed) < kQueueTailRoom)) { michael@0: RealignOutputQueue(); michael@0: } michael@0: } michael@0: michael@0: void michael@0: SpdySession3::DontReuse() michael@0: { michael@0: mShouldGoAway = true; michael@0: if (!mStreamTransactionHash.Count()) michael@0: Close(NS_OK); michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::GetWriteQueueSize() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: return mReadyForWrite.GetSize(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::ChangeDownstreamState(enum stateType newState) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: LOG3(("SpdyStream3::ChangeDownstreamState() %p from %X to %X", michael@0: this, mDownstreamState, newState)); michael@0: mDownstreamState = newState; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::ResetDownstreamState() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: LOG3(("SpdyStream3::ResetDownstreamState() %p", this)); michael@0: ChangeDownstreamState(BUFFERING_FRAME_HEADER); michael@0: michael@0: if (mInputFrameDataLast && mInputFrameDataStream) { michael@0: mInputFrameDataLast = false; michael@0: if (!mInputFrameDataStream->RecvdFin()) { michael@0: LOG3((" SetRecvdFin id=0x%x\n", mInputFrameDataStream->StreamID())); michael@0: mInputFrameDataStream->SetRecvdFin(true); michael@0: DecrementConcurrent(mInputFrameDataStream); michael@0: } michael@0: } michael@0: mInputFrameBufferUsed = 0; michael@0: mInputFrameDataStream = nullptr; michael@0: } michael@0: michael@0: template void michael@0: SpdySession3::EnsureBuffer(nsAutoArrayPtr &buf, michael@0: uint32_t newSize, michael@0: uint32_t preserve, michael@0: uint32_t &objSize) michael@0: { michael@0: if (objSize >= newSize) michael@0: return; michael@0: michael@0: // Leave a little slop on the new allocation - add 2KB to michael@0: // what we need and then round the result up to a 4KB (page) michael@0: // boundary. michael@0: michael@0: objSize = (newSize + 2048 + 4095) & ~4095; michael@0: michael@0: static_assert(sizeof(T) == 1, "sizeof(T) must be 1"); michael@0: nsAutoArrayPtr tmp(new T[objSize]); michael@0: memcpy(tmp, buf, preserve); michael@0: buf = tmp; michael@0: } michael@0: michael@0: // Instantiate supported templates explicitly. michael@0: template void michael@0: SpdySession3::EnsureBuffer(nsAutoArrayPtr &buf, michael@0: uint32_t newSize, michael@0: uint32_t preserve, michael@0: uint32_t &objSize); michael@0: michael@0: template void michael@0: SpdySession3::EnsureBuffer(nsAutoArrayPtr &buf, michael@0: uint32_t newSize, michael@0: uint32_t preserve, michael@0: uint32_t &objSize); michael@0: michael@0: void michael@0: SpdySession3::DecrementConcurrent(SpdyStream3 *aStream) michael@0: { michael@0: uint32_t id = aStream->StreamID(); michael@0: michael@0: if (id && !(id & 0x1)) michael@0: return; // pushed streams aren't counted in concurrent limit michael@0: michael@0: MOZ_ASSERT(mConcurrent); michael@0: --mConcurrent; michael@0: LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n", michael@0: this, id, mConcurrent)); michael@0: ProcessPending(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::zlibInit() michael@0: { michael@0: mDownstreamZlib.zalloc = SpdyZlibReporter::Alloc; michael@0: mDownstreamZlib.zfree = SpdyZlibReporter::Free; michael@0: mDownstreamZlib.opaque = Z_NULL; michael@0: michael@0: inflateInit(&mDownstreamZlib); michael@0: michael@0: mUpstreamZlib.zalloc = SpdyZlibReporter::Alloc; michael@0: mUpstreamZlib.zfree = SpdyZlibReporter::Free; michael@0: mUpstreamZlib.opaque = Z_NULL; michael@0: michael@0: // mixing carte blanche compression with tls subjects us to traffic michael@0: // analysis attacks michael@0: deflateInit(&mUpstreamZlib, Z_NO_COMPRESSION); michael@0: deflateSetDictionary(&mUpstreamZlib, michael@0: SpdyStream3::kDictionary, michael@0: sizeof(SpdyStream3::kDictionary)); michael@0: } michael@0: michael@0: // Need to decompress some data in order to keep the compression michael@0: // context correct, but we really don't care what the result is michael@0: nsresult michael@0: SpdySession3::UncompressAndDiscard(uint32_t offset, michael@0: uint32_t blockLen) michael@0: { michael@0: char *blockStart = mInputFrameBuffer + offset; michael@0: unsigned char trash[2048]; michael@0: mDownstreamZlib.avail_in = blockLen; michael@0: mDownstreamZlib.next_in = reinterpret_cast(blockStart); michael@0: bool triedDictionary = false; michael@0: michael@0: do { michael@0: mDownstreamZlib.next_out = trash; michael@0: mDownstreamZlib.avail_out = sizeof(trash); michael@0: int zlib_rv = inflate(&mDownstreamZlib, Z_NO_FLUSH); michael@0: michael@0: if (zlib_rv == Z_NEED_DICT) { michael@0: if (triedDictionary) { michael@0: LOG3(("SpdySession3::UncompressAndDiscard %p Dictionary Error\n", this)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: triedDictionary = true; michael@0: inflateSetDictionary(&mDownstreamZlib, SpdyStream3::kDictionary, michael@0: sizeof(SpdyStream3::kDictionary)); michael@0: } michael@0: michael@0: if (zlib_rv == Z_DATA_ERROR) michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: michael@0: if (zlib_rv == Z_MEM_ERROR) michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: while (mDownstreamZlib.avail_in); michael@0: return NS_OK; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::GeneratePing(uint32_t aID) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::GeneratePing %p 0x%X\n", this, aID)); michael@0: michael@0: EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 12, michael@0: mOutputQueueUsed, mOutputQueueSize); michael@0: char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed; michael@0: mOutputQueueUsed += 12; michael@0: michael@0: packet[0] = kFlag_Control; michael@0: packet[1] = kVersion; michael@0: packet[2] = 0; michael@0: packet[3] = CONTROL_TYPE_PING; michael@0: packet[4] = 0; /* flags */ michael@0: packet[5] = 0; michael@0: packet[6] = 0; michael@0: packet[7] = 4; /* length */ michael@0: michael@0: NetworkEndian::writeUint32(packet + 8, aID); michael@0: michael@0: LogIO(this, nullptr, "Generate Ping", packet, 12); michael@0: FlushOutputQueue(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::GenerateRstStream(uint32_t aStatusCode, uint32_t aID) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::GenerateRst %p 0x%X %d\n", this, aID, aStatusCode)); michael@0: michael@0: EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 16, michael@0: mOutputQueueUsed, mOutputQueueSize); michael@0: char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed; michael@0: mOutputQueueUsed += 16; michael@0: michael@0: packet[0] = kFlag_Control; michael@0: packet[1] = kVersion; michael@0: packet[2] = 0; michael@0: packet[3] = CONTROL_TYPE_RST_STREAM; michael@0: packet[4] = 0; /* flags */ michael@0: packet[5] = 0; michael@0: packet[6] = 0; michael@0: packet[7] = 8; /* length */ michael@0: michael@0: NetworkEndian::writeUint32(packet + 8, aID); michael@0: NetworkEndian::writeUint32(packet + 12, aStatusCode); michael@0: michael@0: LogIO(this, nullptr, "Generate Reset", packet, 16); michael@0: FlushOutputQueue(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::GenerateGoAway(uint32_t aStatusCode) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::GenerateGoAway %p code=%X\n", this, aStatusCode)); michael@0: michael@0: EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 16, michael@0: mOutputQueueUsed, mOutputQueueSize); michael@0: char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed; michael@0: mOutputQueueUsed += 16; michael@0: michael@0: memset(packet, 0, 16); michael@0: packet[0] = kFlag_Control; michael@0: packet[1] = kVersion; michael@0: packet[3] = CONTROL_TYPE_GOAWAY; michael@0: packet[7] = 8; /* data length */ michael@0: michael@0: // last-good-stream-id are bytes 8-11, when we accept server push this will michael@0: // need to be set non zero michael@0: michael@0: // bytes 12-15 are the status code. michael@0: NetworkEndian::writeUint32(packet + 12, aStatusCode); michael@0: michael@0: LogIO(this, nullptr, "Generate GoAway", packet, 16); michael@0: FlushOutputQueue(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::GenerateSettings() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::GenerateSettings %p\n", this)); michael@0: michael@0: static const uint32_t maxDataLen = 4 + 3 * 8; // sized for 3 settings michael@0: EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 8 + maxDataLen, michael@0: mOutputQueueUsed, mOutputQueueSize); michael@0: char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed; michael@0: michael@0: memset(packet, 0, 8 + maxDataLen); michael@0: packet[0] = kFlag_Control; michael@0: packet[1] = kVersion; michael@0: packet[3] = CONTROL_TYPE_SETTINGS; michael@0: michael@0: uint8_t numberOfEntries = 0; michael@0: michael@0: // entries need to be listed in order by ID michael@0: // 1st entry is bytes 12 to 19 michael@0: // 2nd entry is bytes 20 to 27 michael@0: // 3rd entry is bytes 28 to 35 michael@0: michael@0: if (!gHttpHandler->AllowPush()) { michael@0: // announcing that we accept 0 incoming streams is done to michael@0: // disable server push michael@0: packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_MAX_CONCURRENT; michael@0: // The value portion of the setting pair is already initialized to 0 michael@0: numberOfEntries++; michael@0: } michael@0: michael@0: nsRefPtr ci; michael@0: uint32_t cwnd = 0; michael@0: GetConnectionInfo(getter_AddRefs(ci)); michael@0: if (ci) michael@0: cwnd = gHttpHandler->ConnMgr()->GetSpdyCWNDSetting(ci); michael@0: if (cwnd) { michael@0: packet[12 + 8 * numberOfEntries] = PERSISTED_VALUE; michael@0: packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_CWND; michael@0: LOG(("SpdySession3::GenerateSettings %p sending CWND %u\n", this, cwnd)); michael@0: NetworkEndian::writeUint32(packet + 16 + 8 * numberOfEntries, cwnd); michael@0: numberOfEntries++; michael@0: } michael@0: michael@0: // Advertise the Push RWIN and on each client SYN_STREAM pipeline michael@0: // a window update with it in order to use larger initial windows with pulled michael@0: // streams. michael@0: packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_INITIAL_WINDOW; michael@0: NetworkEndian::writeUint32(packet + 16 + 8 * numberOfEntries, mPushAllowance); michael@0: numberOfEntries++; michael@0: michael@0: uint32_t dataLen = 4 + 8 * numberOfEntries; michael@0: mOutputQueueUsed += 8 + dataLen; michael@0: packet[7] = dataLen; michael@0: packet[11] = numberOfEntries; michael@0: michael@0: LogIO(this, nullptr, "Generate Settings", packet, 8 + dataLen); michael@0: FlushOutputQueue(); michael@0: } michael@0: michael@0: // perform a bunch of integrity checks on the stream. michael@0: // returns true if passed, false (plus LOG and ABORT) if failed. michael@0: bool michael@0: SpdySession3::VerifyStream(SpdyStream3 *aStream, uint32_t aOptionalID = 0) michael@0: { michael@0: // This is annoying, but at least it is O(1) michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: #ifndef DEBUG michael@0: // Only do the real verification in debug builds michael@0: return true; michael@0: #endif michael@0: michael@0: if (!aStream) michael@0: return true; michael@0: michael@0: uint32_t test = 0; michael@0: michael@0: do { michael@0: if (aStream->StreamID() == kDeadStreamID) michael@0: break; michael@0: michael@0: nsAHttpTransaction *trans = aStream->Transaction(); michael@0: michael@0: test++; michael@0: if (!trans) michael@0: break; michael@0: michael@0: test++; michael@0: if (mStreamTransactionHash.Get(trans) != aStream) michael@0: break; michael@0: michael@0: if (aStream->StreamID()) { michael@0: SpdyStream3 *idStream = mStreamIDHash.Get(aStream->StreamID()); michael@0: michael@0: test++; michael@0: if (idStream != aStream) michael@0: break; michael@0: michael@0: if (aOptionalID) { michael@0: test++; michael@0: if (idStream->StreamID() != aOptionalID) michael@0: break; michael@0: } michael@0: } michael@0: michael@0: // tests passed michael@0: return true; michael@0: } while (0); michael@0: michael@0: LOG(("SpdySession3 %p VerifyStream Failure %p stream->id=0x%X " michael@0: "optionalID=0x%X trans=%p test=%d\n", michael@0: this, aStream, aStream->StreamID(), michael@0: aOptionalID, aStream->Transaction(), test)); michael@0: michael@0: MOZ_ASSERT(false, "VerifyStream"); michael@0: return false; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::CleanupStream(SpdyStream3 *aStream, nsresult aResult, michael@0: rstReason aResetCode) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::CleanupStream %p %p 0x%X %X\n", michael@0: this, aStream, aStream ? aStream->StreamID() : 0, aResult)); michael@0: if (!aStream) { michael@0: return; michael@0: } michael@0: michael@0: SpdyPushedStream3 *pushSource = nullptr; michael@0: michael@0: if (NS_SUCCEEDED(aResult) && aStream->DeferCleanupOnSuccess()) { michael@0: LOG(("SpdySession3::CleanupStream 0x%X deferred\n", aStream->StreamID())); michael@0: return; michael@0: } michael@0: michael@0: if (!VerifyStream(aStream)) { michael@0: LOG(("SpdySession3::CleanupStream failed to verify stream\n")); michael@0: return; michael@0: } michael@0: michael@0: pushSource = aStream->PushSource(); michael@0: michael@0: if (!aStream->RecvdFin() && aStream->StreamID()) { michael@0: LOG3(("Stream had not processed recv FIN, sending RST code %X\n", michael@0: aResetCode)); michael@0: GenerateRstStream(aResetCode, aStream->StreamID()); michael@0: DecrementConcurrent(aStream); michael@0: } michael@0: michael@0: CloseStream(aStream, aResult); michael@0: michael@0: // Remove the stream from the ID hash table and, if an even id, the pushed michael@0: // table too. michael@0: uint32_t id = aStream->StreamID(); michael@0: if (id > 0) { michael@0: mStreamIDHash.Remove(id); michael@0: if (!(id & 1)) michael@0: mPushedStreams.RemoveElement(aStream); michael@0: } michael@0: michael@0: RemoveStreamFromQueues(aStream); michael@0: michael@0: // removing from the stream transaction hash will michael@0: // delete the SpdyStream3 and drop the reference to michael@0: // its transaction michael@0: mStreamTransactionHash.Remove(aStream->Transaction()); michael@0: michael@0: if (mShouldGoAway && !mStreamTransactionHash.Count()) michael@0: Close(NS_OK); michael@0: michael@0: if (pushSource) { michael@0: pushSource->SetDeferCleanupOnSuccess(false); michael@0: CleanupStream(pushSource, aResult, aResetCode); michael@0: } michael@0: } michael@0: michael@0: static void RemoveStreamFromQueue(SpdyStream3 *aStream, nsDeque &queue) michael@0: { michael@0: uint32_t size = queue.GetSize(); michael@0: for (uint32_t count = 0; count < size; ++count) { michael@0: SpdyStream3 *stream = static_cast(queue.PopFront()); michael@0: if (stream != aStream) michael@0: queue.Push(stream); michael@0: } michael@0: } michael@0: michael@0: void michael@0: SpdySession3::RemoveStreamFromQueues(SpdyStream3 *aStream) michael@0: { michael@0: RemoveStreamFromQueue(aStream, mReadyForWrite); michael@0: RemoveStreamFromQueue(aStream, mQueuedStreams); michael@0: RemoveStreamFromQueue(aStream, mReadyForRead); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::CloseStream(SpdyStream3 *aStream, nsresult aResult) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::CloseStream %p %p 0x%x %X\n", michael@0: this, aStream, aStream->StreamID(), aResult)); michael@0: michael@0: // Check if partial frame reader michael@0: if (aStream == mInputFrameDataStream) { michael@0: LOG3(("Stream had active partial read frame on close")); michael@0: ChangeDownstreamState(DISCARDING_DATA_FRAME); michael@0: mInputFrameDataStream = nullptr; michael@0: } michael@0: michael@0: RemoveStreamFromQueues(aStream); michael@0: michael@0: // Send the stream the close() indication michael@0: aStream->Close(aResult); michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleSynStream(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_SYN_STREAM); michael@0: michael@0: if (self->mInputFrameDataSize < 18) { michael@0: LOG3(("SpdySession3::HandleSynStream %p SYN_STREAM too short data=%d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: uint32_t streamID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: uint32_t associatedID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 3 * sizeof(uint32_t)); michael@0: uint8_t flags = reinterpret_cast(self->mInputFrameBuffer.get())[4]; michael@0: michael@0: LOG3(("SpdySession3::HandleSynStream %p recv SYN_STREAM (push) " michael@0: "for ID 0x%X associated with 0x%X.\n", michael@0: self, streamID, associatedID)); michael@0: michael@0: if (streamID & 0x01) { // test for odd stream ID michael@0: LOG3(("SpdySession3::HandleSynStream %p recvd SYN_STREAM id must be even.", michael@0: self)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: // confirm associated-to michael@0: nsresult rv = self->SetInputFrameDataStream(associatedID); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: SpdyStream3 *associatedStream = self->mInputFrameDataStream; michael@0: michael@0: ++(self->mServerPushedResources); michael@0: michael@0: // Anytime we start using the high bit of stream ID (either client or server) michael@0: // begin to migrate to a new session. michael@0: if (streamID >= kMaxStreamID) michael@0: self->mShouldGoAway = true; michael@0: michael@0: bool resetStream = true; michael@0: SpdyPushCache *cache = nullptr; michael@0: michael@0: if (!(flags & kFlag_Data_UNI)) { michael@0: // pushed streams require UNIDIRECTIONAL flag michael@0: LOG3(("SpdySession3::HandleSynStream %p ID %0x%X associated ID 0x%X failed.\n", michael@0: self, streamID, associatedID)); michael@0: self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID); michael@0: michael@0: } else if (!associatedID) { michael@0: // associated stream 0 will never find a match, but the spec requires a michael@0: // PROTOCOL_ERROR in this specific case michael@0: LOG3(("SpdySession3::HandleSynStream %p associated ID of 0 failed.\n", self)); michael@0: self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID); michael@0: michael@0: } else if (!gHttpHandler->AllowPush()) { michael@0: // MAX_CONCURRENT_STREAMS of 0 in settings should have disabled push, michael@0: // but some servers are buggy about that.. or the config could have michael@0: // been updated after the settings frame was sent. In both cases just michael@0: // reject the pushed stream as refused michael@0: LOG3(("SpdySession3::HandleSynStream Push Recevied when Disabled\n")); michael@0: self->GenerateRstStream(RST_REFUSED_STREAM, streamID); michael@0: michael@0: } else if (!associatedStream) { michael@0: LOG3(("SpdySession3::HandleSynStream %p lookup associated ID failed.\n", self)); michael@0: self->GenerateRstStream(RST_INVALID_STREAM, streamID); michael@0: michael@0: } else { michael@0: nsILoadGroupConnectionInfo *loadGroupCI = associatedStream->LoadGroupConnectionInfo(); michael@0: if (loadGroupCI) { michael@0: loadGroupCI->GetSpdyPushCache(&cache); michael@0: if (!cache) { michael@0: cache = new SpdyPushCache(); michael@0: if (!cache || NS_FAILED(loadGroupCI->SetSpdyPushCache(cache))) { michael@0: delete cache; michael@0: cache = nullptr; michael@0: } michael@0: } michael@0: } michael@0: if (!cache) { michael@0: // this is unexpected, but we can handle it just be refusing the push michael@0: LOG3(("SpdySession3::HandleSynStream Push Recevied without loadgroup cache\n")); michael@0: self->GenerateRstStream(RST_REFUSED_STREAM, streamID); michael@0: } michael@0: else { michael@0: resetStream = false; michael@0: } michael@0: } michael@0: michael@0: if (resetStream) { michael@0: // Need to decompress the headers even though we aren't using them yet in michael@0: // order to keep the compression context consistent for other syn_reply frames michael@0: rv = self->UncompressAndDiscard(18, self->mInputFrameDataSize - 10); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::HandleSynStream uncompress failed\n")); michael@0: return rv; michael@0: } michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Create the buffering transaction and push stream michael@0: nsRefPtr transactionBuffer = michael@0: new SpdyPush3TransactionBuffer(); michael@0: transactionBuffer->SetConnection(self); michael@0: SpdyPushedStream3 *pushedStream = michael@0: new SpdyPushedStream3(transactionBuffer, self, michael@0: associatedStream, streamID); michael@0: michael@0: // ownership of the pushed stream is by the transaction hash, just as it michael@0: // is for a client initiated stream. Errors that aren't fatal to the michael@0: // whole session must call cleanupStream() after this point in order michael@0: // to remove the stream from that hash. michael@0: self->mStreamTransactionHash.Put(transactionBuffer, pushedStream); michael@0: self->mPushedStreams.AppendElement(pushedStream); michael@0: michael@0: // The pushed stream is unidirectional so it is fully open immediately michael@0: pushedStream->SetFullyOpen(); michael@0: michael@0: // Uncompress the response headers into a stream specific buffer, leaving them michael@0: // in spdy format for the time being. michael@0: rv = pushedStream->Uncompress(&self->mDownstreamZlib, michael@0: self->mInputFrameBuffer + 18, michael@0: self->mInputFrameDataSize - 10); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::HandleSynStream uncompress failed\n")); michael@0: return rv; michael@0: } michael@0: michael@0: if (self->RegisterStreamID(pushedStream, streamID) == kDeadStreamID) { michael@0: LOG(("SpdySession3::HandleSynStream registerstreamid failed\n")); michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: // Fake the request side of the pushed HTTP transaction. Sets up hash michael@0: // key and origin michael@0: uint32_t notUsed; michael@0: pushedStream->ReadSegments(nullptr, 1, ¬Used); michael@0: michael@0: nsAutoCString key; michael@0: if (!pushedStream->GetHashKey(key)) { michael@0: LOG(("SpdySession3::HandleSynStream one of :host :scheme :path missing from push\n")); michael@0: self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (!associatedStream->Origin().Equals(pushedStream->Origin())) { michael@0: LOG(("SpdySession3::HandleSynStream pushed stream mismatched origin\n")); michael@0: self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (!cache->RegisterPushedStreamSpdy3(key, pushedStream)) { michael@0: LOG(("SpdySession3::HandleSynStream registerPushedStream Failed\n")); michael@0: self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::SetInputFrameDataStream(uint32_t streamID) michael@0: { michael@0: mInputFrameDataStream = mStreamIDHash.Get(streamID); michael@0: if (VerifyStream(mInputFrameDataStream, streamID)) michael@0: return NS_OK; michael@0: michael@0: LOG(("SpdySession3::SetInputFrameDataStream failed to verify 0x%X\n", michael@0: streamID)); michael@0: mInputFrameDataStream = nullptr; michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleSynReply(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_SYN_REPLY); michael@0: michael@0: if (self->mInputFrameDataSize < 4) { michael@0: LOG3(("SpdySession3::HandleSynReply %p SYN REPLY too short data=%d", michael@0: self, self->mInputFrameDataSize)); michael@0: // A framing error is a session wide error that cannot be recovered michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: LOG3(("SpdySession3::HandleSynReply %p lookup via streamID in syn_reply.\n", michael@0: self)); michael@0: uint32_t streamID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: nsresult rv = self->SetInputFrameDataStream(streamID); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: if (!self->mInputFrameDataStream) { michael@0: // Cannot find stream. We can continue the SPDY session, but we need to michael@0: // uncompress the header block to maintain the correct compression context michael@0: michael@0: LOG3(("SpdySession3::HandleSynReply %p lookup streamID in syn_reply " michael@0: "0x%X failed. NextStreamID = 0x%X\n", michael@0: self, streamID, self->mNextStreamID)); michael@0: michael@0: if (streamID >= self->mNextStreamID) michael@0: self->GenerateRstStream(RST_INVALID_STREAM, streamID); michael@0: michael@0: rv = self->UncompressAndDiscard(12, self->mInputFrameDataSize - 4); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::HandleSynReply uncompress failed\n")); michael@0: // this is fatal to the session michael@0: return rv; michael@0: } michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Uncompress the headers into a stream specific buffer, leaving them in michael@0: // spdy format for the time being. Make certain to do this michael@0: // step before any error handling that might abort the stream but not michael@0: // the session becuase the session compression context will become michael@0: // inconsistent if all of the compressed data is not processed. michael@0: rv = self->mInputFrameDataStream->Uncompress(&self->mDownstreamZlib, michael@0: self->mInputFrameBuffer + 12, michael@0: self->mInputFrameDataSize - 4); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::HandleSynReply uncompress failed\n")); michael@0: return rv; michael@0: } michael@0: michael@0: if (self->mInputFrameDataStream->GetFullyOpen()) { michael@0: // "If an endpoint receives multiple SYN_REPLY frames for the same active michael@0: // stream ID, it MUST issue a stream error (Section 2.4.2) with the error michael@0: // code STREAM_IN_USE." michael@0: // michael@0: // "STREAM_ALREADY_CLOSED. The endpoint received a data or SYN_REPLY michael@0: // frame for a stream which is half closed." michael@0: // michael@0: // If the stream is open then just RST_STREAM with STREAM_IN_USE michael@0: // If the stream is half closed then RST_STREAM with STREAM_ALREADY_CLOSED michael@0: // abort the session michael@0: // michael@0: LOG3(("SpdySession3::HandleSynReply %p dup SYN_REPLY for 0x%X" michael@0: " recvdfin=%d", self, self->mInputFrameDataStream->StreamID(), michael@0: self->mInputFrameDataStream->RecvdFin())); michael@0: michael@0: self->CleanupStream(self->mInputFrameDataStream, NS_ERROR_ALREADY_OPENED, michael@0: self->mInputFrameDataStream->RecvdFin() ? michael@0: RST_STREAM_ALREADY_CLOSED : RST_STREAM_IN_USE); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: self->mInputFrameDataStream->SetFullyOpen(); michael@0: michael@0: self->mInputFrameDataLast = self->mInputFrameBuffer[4] & kFlag_Data_FIN; michael@0: self->mInputFrameDataStream->UpdateTransportReadEvents(self->mInputFrameDataSize); michael@0: self->mLastDataReadEpoch = self->mLastReadEpoch; michael@0: michael@0: if (self->mInputFrameBuffer[4] & ~kFlag_Data_FIN) { michael@0: LOG3(("SynReply %p had undefined flag set 0x%X\n", self, streamID)); michael@0: self->CleanupStream(self->mInputFrameDataStream, NS_ERROR_ILLEGAL_VALUE, michael@0: RST_PROTOCOL_ERROR); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (!self->mInputFrameDataLast) { michael@0: // don't process the headers yet as there could be more coming from HEADERS michael@0: // frames michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: rv = self->ResponseHeadersComplete(); michael@0: if (rv == NS_ERROR_ILLEGAL_VALUE) { michael@0: LOG3(("SpdySession3::HandleSynReply %p PROTOCOL_ERROR detected 0x%X\n", michael@0: self, streamID)); michael@0: self->CleanupStream(self->mInputFrameDataStream, rv, RST_PROTOCOL_ERROR); michael@0: self->ResetDownstreamState(); michael@0: rv = NS_OK; michael@0: } michael@0: return rv; michael@0: } michael@0: michael@0: // ResponseHeadersComplete() returns NS_ERROR_ILLEGAL_VALUE when the stream michael@0: // should be reset with a PROTOCOL_ERROR, NS_OK when the SYN_REPLY was michael@0: // fine, and any other error is fatal to the session. michael@0: nsresult michael@0: SpdySession3::ResponseHeadersComplete() michael@0: { michael@0: LOG3(("SpdySession3::ResponseHeadersComplete %p for 0x%X fin=%d", michael@0: this, mInputFrameDataStream->StreamID(), mInputFrameDataLast)); michael@0: michael@0: // The spdystream needs to see flattened http headers michael@0: // Uncompressed spdy format headers currently live in michael@0: // SpdyStream3::mDecompressBuffer - convert that to HTTP format in michael@0: // mFlatHTTPResponseHeaders via ConvertHeaders() michael@0: michael@0: mFlatHTTPResponseHeadersOut = 0; michael@0: nsresult rv = mInputFrameDataStream->ConvertHeaders(mFlatHTTPResponseHeaders); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: ChangeDownstreamState(PROCESSING_COMPLETE_HEADERS); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleRstStream(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_RST_STREAM); michael@0: michael@0: if (self->mInputFrameDataSize != 8) { michael@0: LOG3(("SpdySession3::HandleRstStream %p RST_STREAM wrong length data=%d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: uint8_t flags = reinterpret_cast(self->mInputFrameBuffer.get())[4]; michael@0: michael@0: uint32_t streamID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: michael@0: self->mDownstreamRstReason = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 3 * sizeof(uint32_t)); michael@0: michael@0: LOG3(("SpdySession3::HandleRstStream %p RST_STREAM Reason Code %u ID %x " michael@0: "flags %x", self, self->mDownstreamRstReason, streamID, flags)); michael@0: michael@0: if (flags != 0) { michael@0: LOG3(("SpdySession3::HandleRstStream %p RST_STREAM with flags is illegal", michael@0: self)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: if (self->mDownstreamRstReason == RST_INVALID_STREAM || michael@0: self->mDownstreamRstReason == RST_STREAM_IN_USE || michael@0: self->mDownstreamRstReason == RST_FLOW_CONTROL_ERROR) { michael@0: // basically just ignore this michael@0: LOG3(("SpdySession3::HandleRstStream %p No Reset Processing Needed.\n")); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult rv = self->SetInputFrameDataStream(streamID); michael@0: michael@0: if (!self->mInputFrameDataStream) { michael@0: if (NS_FAILED(rv)) michael@0: LOG(("SpdySession3::HandleRstStream %p lookup streamID for RST Frame " michael@0: "0x%X failed reason = %d :: VerifyStream Failed\n", self, streamID, michael@0: self->mDownstreamRstReason)); michael@0: michael@0: LOG3(("SpdySession3::HandleRstStream %p lookup streamID for RST Frame " michael@0: "0x%X failed reason = %d", self, streamID, michael@0: self->mDownstreamRstReason)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: self->ChangeDownstreamState(PROCESSING_CONTROL_RST_STREAM); michael@0: return NS_OK; michael@0: } michael@0: michael@0: PLDHashOperator michael@0: SpdySession3::UpdateServerRwinEnumerator(nsAHttpTransaction *key, michael@0: nsAutoPtr &stream, michael@0: void *closure) michael@0: { michael@0: int32_t delta = *(static_cast(closure)); michael@0: stream->UpdateRemoteWindow(delta); michael@0: return PL_DHASH_NEXT; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleSettings(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_SETTINGS); michael@0: michael@0: if (self->mInputFrameDataSize < 4) { michael@0: LOG3(("SpdySession3::HandleSettings %p SETTINGS wrong length data=%d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: uint32_t numEntries = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: michael@0: // Ensure frame is large enough for supplied number of entries michael@0: // Each entry is 8 bytes, frame data is reduced by 4 to account for michael@0: // the NumEntries value. michael@0: if ((self->mInputFrameDataSize - 4) < (numEntries * 8)) { michael@0: LOG3(("SpdySession3::HandleSettings %p SETTINGS wrong length data=%d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: LOG3(("SpdySession3::HandleSettings %p SETTINGS Control Frame with %d entries", michael@0: self, numEntries)); michael@0: michael@0: for (uint32_t index = 0; index < numEntries; ++index) { michael@0: unsigned char *setting = reinterpret_cast michael@0: (self->mInputFrameBuffer.get()) + 12 + index * 8; michael@0: michael@0: uint32_t flags = setting[0]; michael@0: uint32_t id = NetworkEndian::readUint32(setting) & 0xffffff; michael@0: uint32_t value = NetworkEndian::readUint32(setting + 1 * sizeof(uint32_t)); michael@0: michael@0: LOG3(("Settings ID %d, Flags %X, Value %d", id, flags, value)); michael@0: michael@0: switch (id) michael@0: { michael@0: case SETTINGS_TYPE_UPLOAD_BW: michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_UL_BW, value); michael@0: break; michael@0: michael@0: case SETTINGS_TYPE_DOWNLOAD_BW: michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_DL_BW, value); michael@0: break; michael@0: michael@0: case SETTINGS_TYPE_RTT: michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_RTT, value); michael@0: break; michael@0: michael@0: case SETTINGS_TYPE_MAX_CONCURRENT: michael@0: self->mMaxConcurrent = value; michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value); michael@0: break; michael@0: michael@0: case SETTINGS_TYPE_CWND: michael@0: if (flags & PERSIST_VALUE) michael@0: { michael@0: nsRefPtr ci; michael@0: self->GetConnectionInfo(getter_AddRefs(ci)); michael@0: if (ci) michael@0: gHttpHandler->ConnMgr()->ReportSpdyCWNDSetting(ci, value); michael@0: } michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_CWND, value); michael@0: break; michael@0: michael@0: case SETTINGS_TYPE_DOWNLOAD_RETRANS_RATE: michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_RETRANS, value); michael@0: break; michael@0: michael@0: case SETTINGS_TYPE_INITIAL_WINDOW: michael@0: Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_IW, value >> 10); michael@0: { michael@0: int32_t delta = value - self->mServerInitialWindow; michael@0: self->mServerInitialWindow = value; michael@0: michael@0: // we need to add the delta to all open streams (delta can be negative) michael@0: self->mStreamTransactionHash.Enumerate(UpdateServerRwinEnumerator, michael@0: &delta); michael@0: } michael@0: break; michael@0: michael@0: default: michael@0: break; michael@0: } michael@0: michael@0: } michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleNoop(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_NOOP); michael@0: michael@0: // Should not be receiving noop frames in spdy/3, so we'll just michael@0: // make a log and ignore it michael@0: michael@0: LOG3(("SpdySession3::HandleNoop %p NOP.", self)); michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandlePing(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_PING); michael@0: michael@0: if (self->mInputFrameDataSize != 4) { michael@0: LOG3(("SpdySession3::HandlePing %p PING had wrong amount of data %d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: uint32_t pingID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: michael@0: LOG3(("SpdySession3::HandlePing %p PING ID 0x%X.", self, pingID)); michael@0: michael@0: if (pingID & 0x01) { michael@0: // presumably a reply to our timeout ping michael@0: self->mPingSentEpoch = 0; michael@0: } michael@0: else { michael@0: // Servers initiate even numbered pings, go ahead and echo it back michael@0: self->GeneratePing(pingID); michael@0: } michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleGoAway(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_GOAWAY); michael@0: michael@0: if (self->mInputFrameDataSize != 8) { michael@0: LOG3(("SpdySession3::HandleGoAway %p GOAWAY had wrong amount of data %d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: self->mShouldGoAway = true; michael@0: self->mGoAwayID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: self->mCleanShutdown = true; michael@0: michael@0: // Find streams greater than the last-good ID and mark them for deletion michael@0: // in the mGoAwayStreamsToRestart queue with the GoAwayEnumerator. The michael@0: // underlying transaction can be restarted. michael@0: self->mStreamTransactionHash.Enumerate(GoAwayEnumerator, self); michael@0: michael@0: // Process the streams marked for deletion and restart. michael@0: uint32_t size = self->mGoAwayStreamsToRestart.GetSize(); michael@0: for (uint32_t count = 0; count < size; ++count) { michael@0: SpdyStream3 *stream = michael@0: static_cast(self->mGoAwayStreamsToRestart.PopFront()); michael@0: michael@0: self->CloseStream(stream, NS_ERROR_NET_RESET); michael@0: if (stream->HasRegisteredID()) michael@0: self->mStreamIDHash.Remove(stream->StreamID()); michael@0: self->mStreamTransactionHash.Remove(stream->Transaction()); michael@0: } michael@0: michael@0: // Queued streams can also be deleted from this session and restarted michael@0: // in another one. (they were never sent on the network so they implicitly michael@0: // are not covered by the last-good id. michael@0: size = self->mQueuedStreams.GetSize(); michael@0: for (uint32_t count = 0; count < size; ++count) { michael@0: SpdyStream3 *stream = michael@0: static_cast(self->mQueuedStreams.PopFront()); michael@0: self->CloseStream(stream, NS_ERROR_NET_RESET); michael@0: self->mStreamTransactionHash.Remove(stream->Transaction()); michael@0: } michael@0: michael@0: LOG3(("SpdySession3::HandleGoAway %p GOAWAY Last-Good-ID 0x%X status 0x%X " michael@0: "live streams=%d\n", self, self->mGoAwayID, michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + michael@0: 3 * sizeof(uint32_t)), michael@0: self->mStreamTransactionHash.Count())); michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleHeaders(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_HEADERS); michael@0: michael@0: if (self->mInputFrameDataSize < 4) { michael@0: LOG3(("SpdySession3::HandleHeaders %p HEADERS had wrong amount of data %d", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: uint32_t streamID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: LOG3(("SpdySession3::HandleHeaders %p HEADERS for Stream 0x%X.\n", michael@0: self, streamID)); michael@0: nsresult rv = self->SetInputFrameDataStream(streamID); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: if (!self->mInputFrameDataStream) { michael@0: LOG3(("SpdySession3::HandleHeaders %p lookup streamID 0x%X failed.\n", michael@0: self, streamID)); michael@0: if (streamID >= self->mNextStreamID) michael@0: self->GenerateRstStream(RST_INVALID_STREAM, streamID); michael@0: michael@0: rv = self->UncompressAndDiscard(12, self->mInputFrameDataSize - 4); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::HandleHeaders uncompress failed\n")); michael@0: // this is fatal to the session michael@0: return rv; michael@0: } michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Uncompress the headers into local buffers in the SpdyStream, leaving michael@0: // them in spdy format for the time being. Make certain to do this michael@0: // step before any error handling that might abort the stream but not michael@0: // the session becuase the session compression context will become michael@0: // inconsistent if all of the compressed data is not processed. michael@0: rv = self->mInputFrameDataStream->Uncompress(&self->mDownstreamZlib, michael@0: self->mInputFrameBuffer + 12, michael@0: self->mInputFrameDataSize - 4); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::HandleHeaders uncompress failed\n")); michael@0: return rv; michael@0: } michael@0: michael@0: self->mInputFrameDataLast = self->mInputFrameBuffer[4] & kFlag_Data_FIN; michael@0: self->mInputFrameDataStream-> michael@0: UpdateTransportReadEvents(self->mInputFrameDataSize); michael@0: self->mLastDataReadEpoch = self->mLastReadEpoch; michael@0: michael@0: if (self->mInputFrameBuffer[4] & ~kFlag_Data_FIN) { michael@0: LOG3(("Headers %p had undefined flag set 0x%X\n", self, streamID)); michael@0: self->CleanupStream(self->mInputFrameDataStream, NS_ERROR_ILLEGAL_VALUE, michael@0: RST_PROTOCOL_ERROR); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (!self->mInputFrameDataLast) { michael@0: // don't process the headers yet as there could be more HEADERS frames michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: rv = self->ResponseHeadersComplete(); michael@0: if (rv == NS_ERROR_ILLEGAL_VALUE) { michael@0: LOG3(("SpdySession3::HanndleHeaders %p PROTOCOL_ERROR detected 0x%X\n", michael@0: self, streamID)); michael@0: self->CleanupStream(self->mInputFrameDataStream, rv, RST_PROTOCOL_ERROR); michael@0: self->ResetDownstreamState(); michael@0: rv = NS_OK; michael@0: } michael@0: return rv; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleWindowUpdate(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_WINDOW_UPDATE); michael@0: michael@0: if (self->mInputFrameDataSize < 8) { michael@0: LOG3(("SpdySession3::HandleWindowUpdate %p Window Update wrong length %d\n", michael@0: self, self->mInputFrameDataSize)); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: michael@0: uint32_t delta = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 3 * sizeof(uint32_t)); michael@0: delta &= 0x7fffffff; michael@0: uint32_t streamID = michael@0: NetworkEndian::readUint32(self->mInputFrameBuffer + 2 * sizeof(uint32_t)); michael@0: streamID &= 0x7fffffff; michael@0: michael@0: LOG3(("SpdySession3::HandleWindowUpdate %p len=%d for Stream 0x%X.\n", michael@0: self, delta, streamID)); michael@0: nsresult rv = self->SetInputFrameDataStream(streamID); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: if (!self->mInputFrameDataStream) { michael@0: LOG3(("SpdySession3::HandleWindowUpdate %p lookup streamID 0x%X failed.\n", michael@0: self, streamID)); michael@0: if (streamID >= self->mNextStreamID) michael@0: self->GenerateRstStream(RST_INVALID_STREAM, streamID); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: self->mInputFrameDataStream->UpdateRemoteWindow(delta); michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::HandleCredential(SpdySession3 *self) michael@0: { michael@0: MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_CREDENTIAL); michael@0: michael@0: // These aren't used yet. Just ignore the frame. michael@0: michael@0: LOG3(("SpdySession3::HandleCredential %p NOP.", self)); michael@0: michael@0: self->ResetDownstreamState(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsAHttpTransaction. It is expected that nsHttpConnection is the caller michael@0: // of these methods michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: void michael@0: SpdySession3::OnTransportStatus(nsITransport* aTransport, michael@0: nsresult aStatus, michael@0: uint64_t aProgress) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: switch (aStatus) { michael@0: // These should appear only once, deliver to the first michael@0: // transaction on the session. michael@0: case NS_NET_STATUS_RESOLVING_HOST: michael@0: case NS_NET_STATUS_RESOLVED_HOST: michael@0: case NS_NET_STATUS_CONNECTING_TO: michael@0: case NS_NET_STATUS_CONNECTED_TO: michael@0: { michael@0: SpdyStream3 *target = mStreamIDHash.Get(1); michael@0: nsAHttpTransaction *transaction = target ? target->Transaction() : nullptr; michael@0: if (transaction) michael@0: transaction->OnTransportStatus(aTransport, aStatus, aProgress); michael@0: break; michael@0: } michael@0: michael@0: default: michael@0: // The other transport events are ignored here because there is no good michael@0: // way to map them to the right transaction in spdy. Instead, the events michael@0: // are generated again from the spdy code and passed directly to the michael@0: // correct transaction. michael@0: michael@0: // NS_NET_STATUS_SENDING_TO: michael@0: // This is generated by the socket transport when (part) of michael@0: // a transaction is written out michael@0: // michael@0: // There is no good way to map it to the right transaction in spdy, michael@0: // so it is ignored here and generated separately when the SYN_STREAM michael@0: // is sent from SpdyStream3::TransmitFrame michael@0: michael@0: // NS_NET_STATUS_WAITING_FOR: michael@0: // Created by nsHttpConnection when the request has been totally sent. michael@0: // There is no good way to map it to the right transaction in spdy, michael@0: // so it is ignored here and generated separately when the same michael@0: // condition is complete in SpdyStream3 when there is no more michael@0: // request body left to be transmitted. michael@0: michael@0: // NS_NET_STATUS_RECEIVING_FROM michael@0: // Generated in spdysession whenever we read a data frame or a syn_reply michael@0: // that can be attributed to a particular stream/transaction michael@0: michael@0: break; michael@0: } michael@0: } michael@0: michael@0: // ReadSegments() is used to write data to the network. Generally, HTTP michael@0: // request data is pulled from the approriate transaction and michael@0: // converted to SPDY data. Sometimes control data like window-update are michael@0: // generated instead. michael@0: michael@0: nsresult michael@0: SpdySession3::ReadSegments(nsAHttpSegmentReader *reader, michael@0: uint32_t count, michael@0: uint32_t *countRead) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: MOZ_ASSERT(!mSegmentReader || !reader || (mSegmentReader == reader), michael@0: "Inconsistent Write Function Callback"); michael@0: michael@0: if (reader) michael@0: mSegmentReader = reader; michael@0: michael@0: nsresult rv; michael@0: *countRead = 0; michael@0: michael@0: LOG3(("SpdySession3::ReadSegments %p", this)); michael@0: michael@0: SpdyStream3 *stream = static_cast(mReadyForWrite.PopFront()); michael@0: if (!stream) { michael@0: LOG3(("SpdySession3 %p could not identify a stream to write; suspending.", michael@0: this)); michael@0: FlushOutputQueue(); michael@0: SetWriteCallbacks(); michael@0: return NS_BASE_STREAM_WOULD_BLOCK; michael@0: } michael@0: michael@0: LOG3(("SpdySession3 %p will write from SpdyStream3 %p 0x%X " michael@0: "block-input=%d block-output=%d\n", this, stream, stream->StreamID(), michael@0: stream->RequestBlockedOnRead(), stream->BlockedOnRwin())); michael@0: michael@0: rv = stream->ReadSegments(this, count, countRead); michael@0: michael@0: // Not every permutation of stream->ReadSegents produces data (and therefore michael@0: // tries to flush the output queue) - SENDING_FIN_STREAM can be an example michael@0: // of that. But we might still have old data buffered that would be good michael@0: // to flush. michael@0: FlushOutputQueue(); michael@0: michael@0: // Allow new server reads - that might be data or control information michael@0: // (e.g. window updates or http replies) that are responses to these writes michael@0: ResumeRecv(); michael@0: michael@0: if (stream->RequestBlockedOnRead()) { michael@0: michael@0: // We are blocked waiting for input - either more http headers or michael@0: // any request body data. When more data from the request stream michael@0: // becomes available the httptransaction will call conn->ResumeSend(). michael@0: michael@0: LOG3(("SpdySession3::ReadSegments %p dealing with block on read", this)); michael@0: michael@0: // call readsegments again if there are other streams ready michael@0: // to run in this session michael@0: if (GetWriteQueueSize()) michael@0: rv = NS_OK; michael@0: else michael@0: rv = NS_BASE_STREAM_WOULD_BLOCK; michael@0: SetWriteCallbacks(); michael@0: return rv; michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG3(("SpdySession3::ReadSegments %p returning FAIL code %X", michael@0: this, rv)); michael@0: if (rv != NS_BASE_STREAM_WOULD_BLOCK) michael@0: CleanupStream(stream, rv, RST_CANCEL); michael@0: return rv; michael@0: } michael@0: michael@0: if (*countRead > 0) { michael@0: LOG3(("SpdySession3::ReadSegments %p stream=%p countread=%d", michael@0: this, stream, *countRead)); michael@0: mReadyForWrite.Push(stream); michael@0: SetWriteCallbacks(); michael@0: return rv; michael@0: } michael@0: michael@0: if (stream->BlockedOnRwin()) { michael@0: LOG3(("SpdySession3 %p will stream %p 0x%X suspended for flow control\n", michael@0: this, stream, stream->StreamID())); michael@0: return NS_BASE_STREAM_WOULD_BLOCK; michael@0: } michael@0: michael@0: LOG3(("SpdySession3::ReadSegments %p stream=%p stream send complete", michael@0: this, stream)); michael@0: michael@0: // call readsegments again if there are other streams ready michael@0: // to go in this session michael@0: SetWriteCallbacks(); michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: // WriteSegments() is used to read data off the socket. Generally this is michael@0: // just the SPDY frame header and from there the appropriate SPDYStream michael@0: // is identified from the Stream-ID. The http transaction associated with michael@0: // that read then pulls in the data directly, which it will feed to michael@0: // OnWriteSegment(). That function will gateway it into http and feed michael@0: // it to the appropriate transaction. michael@0: michael@0: // we call writer->OnWriteSegment via NetworkRead() to get a spdy header.. michael@0: // and decide if it is data or control.. if it is control, just deal with it. michael@0: // if it is data, identify the spdy stream michael@0: // call stream->WriteSegments which can call this::OnWriteSegment to get the michael@0: // data. It always gets full frames if they are part of the stream michael@0: michael@0: nsresult michael@0: SpdySession3::WriteSegments(nsAHttpSegmentWriter *writer, michael@0: uint32_t count, michael@0: uint32_t *countWritten) michael@0: { michael@0: typedef nsresult (*Control_FX) (SpdySession3 *self); michael@0: static const Control_FX sControlFunctions[] = michael@0: { michael@0: nullptr, michael@0: SpdySession3::HandleSynStream, michael@0: SpdySession3::HandleSynReply, michael@0: SpdySession3::HandleRstStream, michael@0: SpdySession3::HandleSettings, michael@0: SpdySession3::HandleNoop, michael@0: SpdySession3::HandlePing, michael@0: SpdySession3::HandleGoAway, michael@0: SpdySession3::HandleHeaders, michael@0: SpdySession3::HandleWindowUpdate, michael@0: SpdySession3::HandleCredential michael@0: }; michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: nsresult rv; michael@0: *countWritten = 0; michael@0: michael@0: if (mClosed) michael@0: return NS_ERROR_FAILURE; michael@0: michael@0: SetWriteCallbacks(); michael@0: michael@0: // If there are http transactions attached to a push stream with filled buffers michael@0: // trigger that data pump here. This only reads from buffers (not the network) michael@0: // so mDownstreamState doesn't matter. michael@0: SpdyStream3 *pushConnectedStream = michael@0: static_cast(mReadyForRead.PopFront()); michael@0: if (pushConnectedStream) { michael@0: LOG3(("SpdySession3::WriteSegments %p processing pushed stream 0x%X\n", michael@0: this, pushConnectedStream->StreamID())); michael@0: mSegmentWriter = writer; michael@0: rv = pushConnectedStream->WriteSegments(this, count, countWritten); michael@0: mSegmentWriter = nullptr; michael@0: michael@0: // The pipe in nsHttpTransaction rewrites CLOSED error codes into OK michael@0: // so we need this check to determine the truth. michael@0: if (NS_SUCCEEDED(rv) && !*countWritten && michael@0: pushConnectedStream->PushSource() && michael@0: pushConnectedStream->PushSource()->GetPushComplete()) { michael@0: rv = NS_BASE_STREAM_CLOSED; michael@0: } michael@0: michael@0: if (rv == NS_BASE_STREAM_CLOSED) { michael@0: CleanupStream(pushConnectedStream, NS_OK, RST_CANCEL); michael@0: rv = NS_OK; michael@0: } michael@0: michael@0: // if we return OK to nsHttpConnection it will use mSocketInCondition michael@0: // to determine whether to schedule more reads, incorrectly michael@0: // assuming that nsHttpConnection::OnSocketWrite() was called. michael@0: if (NS_SUCCEEDED(rv) || rv == NS_BASE_STREAM_WOULD_BLOCK) { michael@0: rv = NS_BASE_STREAM_WOULD_BLOCK; michael@0: ResumeRecv(); michael@0: } michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: // We buffer all control frames and act on them in this layer. michael@0: // We buffer the first 8 bytes of data frames (the header) but michael@0: // the actual data is passed through unprocessed. michael@0: michael@0: if (mDownstreamState == BUFFERING_FRAME_HEADER) { michael@0: // The first 8 bytes of every frame is header information that michael@0: // we are going to want to strip before passing to http. That is michael@0: // true of both control and data packets. michael@0: michael@0: MOZ_ASSERT(mInputFrameBufferUsed < 8, michael@0: "Frame Buffer Used Too Large for State"); michael@0: michael@0: rv = NetworkRead(writer, mInputFrameBuffer + mInputFrameBufferUsed, michael@0: 8 - mInputFrameBufferUsed, countWritten); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG3(("SpdySession3 %p buffering frame header read failure %x\n", michael@0: this, rv)); michael@0: // maybe just blocked reading from network michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) michael@0: rv = NS_OK; michael@0: return rv; michael@0: } michael@0: michael@0: LogIO(this, nullptr, "Reading Frame Header", michael@0: mInputFrameBuffer + mInputFrameBufferUsed, *countWritten); michael@0: michael@0: mInputFrameBufferUsed += *countWritten; michael@0: michael@0: if (mInputFrameBufferUsed < 8) michael@0: { michael@0: LOG3(("SpdySession3::WriteSegments %p " michael@0: "BUFFERING FRAME HEADER incomplete size=%d", michael@0: this, mInputFrameBufferUsed)); michael@0: return rv; michael@0: } michael@0: michael@0: // For both control and data frames the second 32 bit word of the header michael@0: // is 8-flags, 24-length. (network byte order) michael@0: mInputFrameDataSize = michael@0: NetworkEndian::readUint32(mInputFrameBuffer + 1 * sizeof(uint32_t)); michael@0: mInputFrameDataSize &= 0x00ffffff; michael@0: mInputFrameDataRead = 0; michael@0: michael@0: if (mInputFrameBuffer[0] & kFlag_Control) { michael@0: EnsureBuffer(mInputFrameBuffer, mInputFrameDataSize + 8, 8, michael@0: mInputFrameBufferSize); michael@0: ChangeDownstreamState(BUFFERING_CONTROL_FRAME); michael@0: michael@0: // The first 32 bit word of the header is michael@0: // 1 ctrl - 15 version - 16 type michael@0: uint16_t version = NetworkEndian::readUint16(mInputFrameBuffer); michael@0: version &= 0x7fff; michael@0: michael@0: mFrameControlType = michael@0: NetworkEndian::readUint16(mInputFrameBuffer + sizeof(uint16_t)); michael@0: michael@0: LOG3(("SpdySession3::WriteSegments %p - Control Frame Identified " michael@0: "type %d version %d data len %d", michael@0: this, mFrameControlType, version, mInputFrameDataSize)); michael@0: michael@0: if (mFrameControlType >= CONTROL_TYPE_LAST || michael@0: mFrameControlType <= CONTROL_TYPE_FIRST) michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: michael@0: if (version != kVersion) michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: else { michael@0: ChangeDownstreamState(PROCESSING_DATA_FRAME); michael@0: michael@0: Telemetry::Accumulate(Telemetry::SPDY_CHUNK_RECVD, michael@0: mInputFrameDataSize >> 10); michael@0: mLastDataReadEpoch = mLastReadEpoch; michael@0: michael@0: uint32_t streamID = NetworkEndian::readUint32(mInputFrameBuffer); michael@0: rv = SetInputFrameDataStream(streamID); michael@0: if (NS_FAILED(rv)) { michael@0: LOG(("SpdySession3::WriteSegments %p lookup streamID 0x%X failed. " michael@0: "probably due to verification.\n", this, streamID)); michael@0: return rv; michael@0: } michael@0: if (!mInputFrameDataStream) { michael@0: LOG3(("SpdySession3::WriteSegments %p lookup streamID 0x%X failed. " michael@0: "Next = 0x%X", this, streamID, mNextStreamID)); michael@0: if (streamID >= mNextStreamID) michael@0: GenerateRstStream(RST_INVALID_STREAM, streamID); michael@0: ChangeDownstreamState(DISCARDING_DATA_FRAME); michael@0: } michael@0: else if (mInputFrameDataStream->RecvdFin()) { michael@0: LOG3(("SpdySession3::WriteSegments %p streamID 0x%X " michael@0: "Data arrived for already server closed stream.\n", michael@0: this, streamID)); michael@0: GenerateRstStream(RST_STREAM_ALREADY_CLOSED, streamID); michael@0: ChangeDownstreamState(DISCARDING_DATA_FRAME); michael@0: } michael@0: else if (!mInputFrameDataStream->RecvdData()) { michael@0: LOG3(("SpdySession3 %p First Data Frame Flushes Headers stream 0x%X\n", michael@0: this, streamID)); michael@0: michael@0: mInputFrameDataStream->SetRecvdData(true); michael@0: rv = ResponseHeadersComplete(); michael@0: if (rv == NS_ERROR_ILLEGAL_VALUE) { michael@0: LOG3(("SpdySession3 %p PROTOCOL_ERROR detected 0x%X\n", michael@0: this, streamID)); michael@0: CleanupStream(mInputFrameDataStream, rv, RST_PROTOCOL_ERROR); michael@0: ChangeDownstreamState(DISCARDING_DATA_FRAME); michael@0: } michael@0: else { michael@0: mDataPending = true; michael@0: } michael@0: } michael@0: michael@0: mInputFrameDataLast = (mInputFrameBuffer[4] & kFlag_Data_FIN); michael@0: LOG3(("Start Processing Data Frame. " michael@0: "Session=%p Stream ID 0x%X Stream Ptr %p Fin=%d Len=%d", michael@0: this, streamID, mInputFrameDataStream, mInputFrameDataLast, michael@0: mInputFrameDataSize)); michael@0: UpdateLocalRwin(mInputFrameDataStream, mInputFrameDataSize); michael@0: } michael@0: } michael@0: michael@0: if (mDownstreamState == PROCESSING_CONTROL_RST_STREAM) { michael@0: if (mDownstreamRstReason == RST_REFUSED_STREAM) michael@0: rv = NS_ERROR_NET_RESET; //we can retry this 100% safely michael@0: else if (mDownstreamRstReason == RST_CANCEL || michael@0: mDownstreamRstReason == RST_PROTOCOL_ERROR || michael@0: mDownstreamRstReason == RST_INTERNAL_ERROR || michael@0: mDownstreamRstReason == RST_UNSUPPORTED_VERSION) michael@0: rv = NS_ERROR_NET_INTERRUPT; michael@0: else if (mDownstreamRstReason == RST_FRAME_TOO_LARGE) michael@0: rv = NS_ERROR_FILE_TOO_BIG; michael@0: else michael@0: rv = NS_ERROR_ILLEGAL_VALUE; michael@0: michael@0: if (mDownstreamRstReason != RST_REFUSED_STREAM && michael@0: mDownstreamRstReason != RST_CANCEL) michael@0: mShouldGoAway = true; michael@0: michael@0: // mInputFrameDataStream is reset by ChangeDownstreamState michael@0: SpdyStream3 *stream = mInputFrameDataStream; michael@0: ResetDownstreamState(); michael@0: LOG3(("SpdySession3::WriteSegments cleanup stream on recv of rst " michael@0: "session=%p stream=%p 0x%X\n", this, stream, michael@0: stream ? stream->StreamID() : 0)); michael@0: CleanupStream(stream, rv, RST_CANCEL); michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (mDownstreamState == PROCESSING_DATA_FRAME || michael@0: mDownstreamState == PROCESSING_COMPLETE_HEADERS) { michael@0: michael@0: // The cleanup stream should only be set while stream->WriteSegments is michael@0: // on the stack and then cleaned up in this code block afterwards. michael@0: MOZ_ASSERT(!mNeedsCleanup, "cleanup stream set unexpectedly"); michael@0: mNeedsCleanup = nullptr; /* just in case */ michael@0: michael@0: mSegmentWriter = writer; michael@0: rv = mInputFrameDataStream->WriteSegments(this, count, countWritten); michael@0: mSegmentWriter = nullptr; michael@0: michael@0: mLastDataReadEpoch = mLastReadEpoch; michael@0: michael@0: if (SoftStreamError(rv)) { michael@0: // This will happen when the transaction figures out it is EOF, generally michael@0: // due to a content-length match being made. Return OK from this function michael@0: // otherwise the whole session would be torn down. michael@0: SpdyStream3 *stream = mInputFrameDataStream; michael@0: michael@0: // if we were doing PROCESSING_COMPLETE_HEADERS need to pop the state michael@0: // back to PROCESSING_DATA_FRAME where we came from michael@0: mDownstreamState = PROCESSING_DATA_FRAME; michael@0: michael@0: if (mInputFrameDataRead == mInputFrameDataSize) michael@0: ResetDownstreamState(); michael@0: LOG3(("SpdySession3::WriteSegments session=%p stream=%p 0x%X " michael@0: "needscleanup=%p. cleanup stream based on " michael@0: "stream->writeSegments returning code %X\n", michael@0: this, stream, stream ? stream->StreamID() : 0, michael@0: mNeedsCleanup, rv)); michael@0: CleanupStream(stream, NS_OK, RST_CANCEL); michael@0: MOZ_ASSERT(!mNeedsCleanup, "double cleanup out of data frame"); michael@0: mNeedsCleanup = nullptr; /* just in case */ michael@0: return NS_OK; michael@0: } michael@0: michael@0: if (mNeedsCleanup) { michael@0: LOG3(("SpdySession3::WriteSegments session=%p stream=%p 0x%X " michael@0: "cleanup stream based on mNeedsCleanup.\n", michael@0: this, mNeedsCleanup, mNeedsCleanup ? mNeedsCleanup->StreamID() : 0)); michael@0: CleanupStream(mNeedsCleanup, NS_OK, RST_CANCEL); michael@0: mNeedsCleanup = nullptr; michael@0: } michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG3(("SpdySession3 %p data frame read failure %x\n", this, rv)); michael@0: // maybe just blocked reading from network michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) michael@0: rv = NS_OK; michael@0: } michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: if (mDownstreamState == DISCARDING_DATA_FRAME) { michael@0: char trash[4096]; michael@0: uint32_t count = std::min(4096U, mInputFrameDataSize - mInputFrameDataRead); michael@0: michael@0: if (!count) { michael@0: ResetDownstreamState(); michael@0: ResumeRecv(); michael@0: return NS_BASE_STREAM_WOULD_BLOCK; michael@0: } michael@0: michael@0: rv = NetworkRead(writer, trash, count, countWritten); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG3(("SpdySession3 %p discard frame read failure %x\n", this, rv)); michael@0: // maybe just blocked reading from network michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) michael@0: rv = NS_OK; michael@0: return rv; michael@0: } michael@0: michael@0: LogIO(this, nullptr, "Discarding Frame", trash, *countWritten); michael@0: michael@0: mInputFrameDataRead += *countWritten; michael@0: michael@0: if (mInputFrameDataRead == mInputFrameDataSize) michael@0: ResetDownstreamState(); michael@0: return rv; michael@0: } michael@0: michael@0: MOZ_ASSERT(mDownstreamState == BUFFERING_CONTROL_FRAME); michael@0: if (mDownstreamState != BUFFERING_CONTROL_FRAME) { michael@0: // this cannot happen michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: MOZ_ASSERT(mInputFrameBufferUsed == 8, michael@0: "Frame Buffer Header Not Present"); michael@0: michael@0: rv = NetworkRead(writer, mInputFrameBuffer + 8 + mInputFrameDataRead, michael@0: mInputFrameDataSize - mInputFrameDataRead, countWritten); michael@0: michael@0: if (NS_FAILED(rv)) { michael@0: LOG3(("SpdySession3 %p buffering control frame read failure %x\n", michael@0: this, rv)); michael@0: // maybe just blocked reading from network michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) michael@0: rv = NS_OK; michael@0: return rv; michael@0: } michael@0: michael@0: LogIO(this, nullptr, "Reading Control Frame", michael@0: mInputFrameBuffer + 8 + mInputFrameDataRead, *countWritten); michael@0: michael@0: mInputFrameDataRead += *countWritten; michael@0: michael@0: if (mInputFrameDataRead != mInputFrameDataSize) michael@0: return NS_OK; michael@0: michael@0: // This check is actually redundant, the control type was previously michael@0: // checked to make sure it was in range, but we will check it again michael@0: // at time of use to make sure a regression doesn't creep in. michael@0: if (mFrameControlType >= CONTROL_TYPE_LAST || michael@0: mFrameControlType <= CONTROL_TYPE_FIRST) michael@0: { michael@0: MOZ_ASSERT(false, "control type out of range"); michael@0: return NS_ERROR_ILLEGAL_VALUE; michael@0: } michael@0: rv = sControlFunctions[mFrameControlType](this); michael@0: michael@0: MOZ_ASSERT(NS_FAILED(rv) || michael@0: mDownstreamState != BUFFERING_CONTROL_FRAME, michael@0: "Control Handler returned OK but did not change state"); michael@0: michael@0: if (mShouldGoAway && !mStreamTransactionHash.Count()) michael@0: Close(NS_OK); michael@0: return rv; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::UpdateLocalRwin(SpdyStream3 *stream, michael@0: uint32_t bytes) michael@0: { michael@0: // If this data packet was not for a valid or live stream then there michael@0: // is no reason to mess with the flow control michael@0: if (!stream || stream->RecvdFin()) michael@0: return; michael@0: michael@0: stream->DecrementLocalWindow(bytes); michael@0: michael@0: // Don't necessarily ack every data packet. Only do it michael@0: // after a significant amount of data. michael@0: uint64_t unacked = stream->LocalUnAcked(); michael@0: int64_t localWindow = stream->LocalWindow(); michael@0: michael@0: LOG3(("SpdySession3::UpdateLocalRwin this=%p id=0x%X newbytes=%u " michael@0: "unacked=%llu localWindow=%lld\n", michael@0: this, stream->StreamID(), bytes, unacked, localWindow)); michael@0: michael@0: if (!unacked) michael@0: return; michael@0: michael@0: if ((unacked < kMinimumToAck) && (localWindow > kEmergencyWindowThreshold)) michael@0: return; michael@0: michael@0: if (!stream->HasSink()) { michael@0: LOG3(("SpdySession3::UpdateLocalRwin %p 0x%X Pushed Stream Has No Sink\n", michael@0: this, stream->StreamID())); michael@0: return; michael@0: } michael@0: michael@0: // Generate window updates directly out of spdysession instead of the stream michael@0: // in order to avoid queue delays in getting the 'ACK' out. michael@0: uint32_t toack = (unacked <= 0x7fffffffU) ? unacked : 0x7fffffffU; michael@0: michael@0: LOG3(("SpdySession3::UpdateLocalRwin Ack this=%p id=0x%X acksize=%d\n", michael@0: this, stream->StreamID(), toack)); michael@0: stream->IncrementLocalWindow(toack); michael@0: michael@0: static const uint32_t dataLen = 8; michael@0: EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 8 + dataLen, michael@0: mOutputQueueUsed, mOutputQueueSize); michael@0: char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed; michael@0: mOutputQueueUsed += 8 + dataLen; michael@0: michael@0: memset(packet, 0, 8 + dataLen); michael@0: packet[0] = kFlag_Control; michael@0: packet[1] = kVersion; michael@0: packet[3] = CONTROL_TYPE_WINDOW_UPDATE; michael@0: packet[7] = dataLen; michael@0: michael@0: NetworkEndian::writeUint32(packet + 8, stream->StreamID()); michael@0: NetworkEndian::writeUint32(packet + 12, toack); michael@0: michael@0: LogIO(this, stream, "Window Update", packet, 8 + dataLen); michael@0: FlushOutputQueue(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::Close(nsresult aReason) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: if (mClosed) michael@0: return; michael@0: michael@0: LOG3(("SpdySession3::Close %p %X", this, aReason)); michael@0: michael@0: mClosed = true; michael@0: michael@0: mStreamTransactionHash.Enumerate(ShutdownEnumerator, this); michael@0: mStreamIDHash.Clear(); michael@0: mStreamTransactionHash.Clear(); michael@0: michael@0: uint32_t goAwayReason; michael@0: if (NS_SUCCEEDED(aReason)) { michael@0: goAwayReason = OK; michael@0: } else if (aReason == NS_ERROR_ILLEGAL_VALUE) { michael@0: goAwayReason = PROTOCOL_ERROR; michael@0: } else { michael@0: goAwayReason = INTERNAL_ERROR; michael@0: } michael@0: GenerateGoAway(goAwayReason); michael@0: mConnection = nullptr; michael@0: mSegmentReader = nullptr; michael@0: mSegmentWriter = nullptr; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::CloseTransaction(nsAHttpTransaction *aTransaction, michael@0: nsresult aResult) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::CloseTransaction %p %p %x", this, aTransaction, aResult)); michael@0: michael@0: // Generally this arrives as a cancel event from the connection manager. michael@0: michael@0: // need to find the stream and call CleanupStream() on it. michael@0: SpdyStream3 *stream = mStreamTransactionHash.Get(aTransaction); michael@0: if (!stream) { michael@0: LOG3(("SpdySession3::CloseTransaction %p %p %x - not found.", michael@0: this, aTransaction, aResult)); michael@0: return; michael@0: } michael@0: LOG3(("SpdySession3::CloseTranscation probably a cancel. " michael@0: "this=%p, trans=%p, result=%x, streamID=0x%X stream=%p", michael@0: this, aTransaction, aResult, stream->StreamID(), stream)); michael@0: CleanupStream(stream, aResult, RST_CANCEL); michael@0: ResumeRecv(); michael@0: } michael@0: michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsAHttpSegmentReader michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsresult michael@0: SpdySession3::OnReadSegment(const char *buf, michael@0: uint32_t count, michael@0: uint32_t *countRead) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: nsresult rv; michael@0: michael@0: // If we can release old queued data then we can try and write the new michael@0: // data directly to the network without using the output queue at all michael@0: if (mOutputQueueUsed) michael@0: FlushOutputQueue(); michael@0: michael@0: if (!mOutputQueueUsed && mSegmentReader) { michael@0: // try and write directly without output queue michael@0: rv = mSegmentReader->OnReadSegment(buf, count, countRead); michael@0: michael@0: if (rv == NS_BASE_STREAM_WOULD_BLOCK) michael@0: *countRead = 0; michael@0: else if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: if (*countRead < count) { michael@0: uint32_t required = count - *countRead; michael@0: // assuming a commitment() happened, this ensurebuffer is a nop michael@0: // but just in case the queuesize is too small for the required data michael@0: // call ensurebuffer(). michael@0: EnsureBuffer(mOutputQueueBuffer, required, 0, mOutputQueueSize); michael@0: memcpy(mOutputQueueBuffer.get(), buf + *countRead, required); michael@0: mOutputQueueUsed = required; michael@0: } michael@0: michael@0: *countRead = count; michael@0: return NS_OK; michael@0: } michael@0: michael@0: // At this point we are going to buffer the new data in the output michael@0: // queue if it fits. By coalescing multiple small submissions into one larger michael@0: // buffer we can get larger writes out to the network later on. michael@0: michael@0: // This routine should not be allowed to fill up the output queue michael@0: // all on its own - at least kQueueReserved bytes are always left michael@0: // for other routines to use - but this is an all-or-nothing function, michael@0: // so if it will not all fit just return WOULD_BLOCK michael@0: michael@0: if ((mOutputQueueUsed + count) > (mOutputQueueSize - kQueueReserved)) michael@0: return NS_BASE_STREAM_WOULD_BLOCK; michael@0: michael@0: memcpy(mOutputQueueBuffer.get() + mOutputQueueUsed, buf, count); michael@0: mOutputQueueUsed += count; michael@0: *countRead = count; michael@0: michael@0: FlushOutputQueue(); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::CommitToSegmentSize(uint32_t count, bool forceCommitment) michael@0: { michael@0: if (mOutputQueueUsed) michael@0: FlushOutputQueue(); michael@0: michael@0: // would there be enough room to buffer this if needed? michael@0: if ((mOutputQueueUsed + count) <= (mOutputQueueSize - kQueueReserved)) michael@0: return NS_OK; michael@0: michael@0: // if we are using part of our buffers already, try again later unless michael@0: // forceCommitment is set. michael@0: if (mOutputQueueUsed && !forceCommitment) michael@0: return NS_BASE_STREAM_WOULD_BLOCK; michael@0: michael@0: if (mOutputQueueUsed) { michael@0: // normally we avoid the memmove of RealignOutputQueue, but we'll try michael@0: // it if forceCommitment is set before growing the buffer. michael@0: RealignOutputQueue(); michael@0: michael@0: // is there enough room now? michael@0: if ((mOutputQueueUsed + count) <= (mOutputQueueSize - kQueueReserved)) michael@0: return NS_OK; michael@0: } michael@0: michael@0: // resize the buffers as needed michael@0: EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + count + kQueueReserved, michael@0: mOutputQueueUsed, mOutputQueueSize); michael@0: michael@0: MOZ_ASSERT((mOutputQueueUsed + count) <= (mOutputQueueSize - kQueueReserved), michael@0: "buffer not as large as expected"); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsAHttpSegmentWriter michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsresult michael@0: SpdySession3::OnWriteSegment(char *buf, michael@0: uint32_t count, michael@0: uint32_t *countWritten) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: nsresult rv; michael@0: michael@0: if (!mSegmentWriter) { michael@0: // the only way this could happen would be if Close() were called on the michael@0: // stack with WriteSegments() michael@0: return NS_ERROR_FAILURE; michael@0: } michael@0: michael@0: if (mDownstreamState == PROCESSING_DATA_FRAME) { michael@0: michael@0: if (mInputFrameDataLast && michael@0: mInputFrameDataRead == mInputFrameDataSize) { michael@0: *countWritten = 0; michael@0: SetNeedsCleanup(); michael@0: return NS_BASE_STREAM_CLOSED; michael@0: } michael@0: michael@0: count = std::min(count, mInputFrameDataSize - mInputFrameDataRead); michael@0: rv = NetworkRead(mSegmentWriter, buf, count, countWritten); michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: LogIO(this, mInputFrameDataStream, "Reading Data Frame", michael@0: buf, *countWritten); michael@0: michael@0: mInputFrameDataRead += *countWritten; michael@0: michael@0: mInputFrameDataStream->UpdateTransportReadEvents(*countWritten); michael@0: if ((mInputFrameDataRead == mInputFrameDataSize) && !mInputFrameDataLast) michael@0: ResetDownstreamState(); michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: if (mDownstreamState == PROCESSING_COMPLETE_HEADERS) { michael@0: michael@0: if (mFlatHTTPResponseHeaders.Length() == mFlatHTTPResponseHeadersOut && michael@0: mInputFrameDataLast) { michael@0: *countWritten = 0; michael@0: SetNeedsCleanup(); michael@0: return NS_BASE_STREAM_CLOSED; michael@0: } michael@0: michael@0: count = std::min(count, michael@0: mFlatHTTPResponseHeaders.Length() - michael@0: mFlatHTTPResponseHeadersOut); michael@0: memcpy(buf, michael@0: mFlatHTTPResponseHeaders.get() + mFlatHTTPResponseHeadersOut, michael@0: count); michael@0: mFlatHTTPResponseHeadersOut += count; michael@0: *countWritten = count; michael@0: michael@0: if (mFlatHTTPResponseHeaders.Length() == mFlatHTTPResponseHeadersOut) { michael@0: if (mDataPending) { michael@0: // Now ready to process data frames - pop PROCESING_DATA_FRAME back onto michael@0: // the stack because receipt of that first data frame triggered the michael@0: // response header processing michael@0: mDataPending = false; michael@0: ChangeDownstreamState(PROCESSING_DATA_FRAME); michael@0: } michael@0: else if (!mInputFrameDataLast) { michael@0: // If more frames are expected in this stream, then reset the state so they can be michael@0: // handled. Otherwise (e.g. a 0 length response with the fin on the SYN_REPLY) michael@0: // stay in PROCESSING_COMPLETE_HEADERS state so the SetNeedsCleanup() code above can michael@0: // cleanup the stream. michael@0: ResetDownstreamState(); michael@0: } michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::SetNeedsCleanup() michael@0: { michael@0: LOG3(("SpdySession3::SetNeedsCleanup %p - recorded downstream fin of " michael@0: "stream %p 0x%X", this, mInputFrameDataStream, michael@0: mInputFrameDataStream->StreamID())); michael@0: michael@0: // This will result in Close() being called michael@0: MOZ_ASSERT(!mNeedsCleanup, "mNeedsCleanup unexpectedly set"); michael@0: mNeedsCleanup = mInputFrameDataStream; michael@0: ResetDownstreamState(); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::ConnectPushedStream(SpdyStream3 *stream) michael@0: { michael@0: mReadyForRead.Push(stream); michael@0: ForceRecv(); michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // Modified methods of nsAHttpConnection michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: void michael@0: SpdySession3::TransactionHasDataToWrite(nsAHttpTransaction *caller) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::TransactionHasDataToWrite %p trans=%p", this, caller)); michael@0: michael@0: // a trapped signal from the http transaction to the connection that michael@0: // it is no longer blocked on read. michael@0: michael@0: SpdyStream3 *stream = mStreamTransactionHash.Get(caller); michael@0: if (!stream || !VerifyStream(stream)) { michael@0: LOG3(("SpdySession3::TransactionHasDataToWrite %p caller %p not found", michael@0: this, caller)); michael@0: return; michael@0: } michael@0: michael@0: LOG3(("SpdySession3::TransactionHasDataToWrite %p ID is 0x%X\n", michael@0: this, stream->StreamID())); michael@0: michael@0: mReadyForWrite.Push(stream); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::TransactionHasDataToWrite(SpdyStream3 *stream) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: LOG3(("SpdySession3::TransactionHasDataToWrite %p stream=%p ID=%x", michael@0: this, stream, stream->StreamID())); michael@0: michael@0: mReadyForWrite.Push(stream); michael@0: SetWriteCallbacks(); michael@0: } michael@0: michael@0: bool michael@0: SpdySession3::IsPersistent() michael@0: { michael@0: return true; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::TakeTransport(nsISocketTransport **, michael@0: nsIAsyncInputStream **, michael@0: nsIAsyncOutputStream **) michael@0: { michael@0: MOZ_ASSERT(false, "TakeTransport of SpdySession3"); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: nsHttpConnection * michael@0: SpdySession3::TakeHttpConnection() michael@0: { michael@0: MOZ_ASSERT(false, "TakeHttpConnection of SpdySession3"); michael@0: return nullptr; michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::CancelPipeline(nsresult reason) michael@0: { michael@0: // we don't pipeline inside spdy, so this isn't an issue michael@0: return 0; michael@0: } michael@0: michael@0: nsAHttpTransaction::Classifier michael@0: SpdySession3::Classification() michael@0: { michael@0: if (!mConnection) michael@0: return nsAHttpTransaction::CLASS_GENERAL; michael@0: return mConnection->Classification(); michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // unused methods of nsAHttpTransaction michael@0: // We can be sure of this because SpdySession3 is only constructed in michael@0: // nsHttpConnection and is never passed out of that object michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: void michael@0: SpdySession3::SetConnection(nsAHttpConnection *) michael@0: { michael@0: // This is unexpected michael@0: MOZ_ASSERT(false, "SpdySession3::SetConnection()"); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::GetSecurityCallbacks(nsIInterfaceRequestor **) michael@0: { michael@0: // This is unexpected michael@0: MOZ_ASSERT(false, "SpdySession3::GetSecurityCallbacks()"); michael@0: } michael@0: michael@0: void michael@0: SpdySession3::SetProxyConnectFailed() michael@0: { michael@0: MOZ_ASSERT(false, "SpdySession3::SetProxyConnectFailed()"); michael@0: } michael@0: michael@0: bool michael@0: SpdySession3::IsDone() michael@0: { michael@0: return !mStreamTransactionHash.Count(); michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::Status() michael@0: { michael@0: MOZ_ASSERT(false, "SpdySession3::Status()"); michael@0: return NS_ERROR_UNEXPECTED; michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::Caps() michael@0: { michael@0: MOZ_ASSERT(false, "SpdySession3::Caps()"); michael@0: return 0; michael@0: } michael@0: michael@0: void michael@0: SpdySession3::SetDNSWasRefreshed() michael@0: { michael@0: } michael@0: michael@0: uint64_t michael@0: SpdySession3::Available() michael@0: { michael@0: MOZ_ASSERT(false, "SpdySession3::Available()"); michael@0: return 0; michael@0: } michael@0: michael@0: nsHttpRequestHead * michael@0: SpdySession3::RequestHead() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(false, michael@0: "SpdySession3::RequestHead() " michael@0: "should not be called after SPDY is setup"); michael@0: return nullptr; michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::Http1xTransactionCount() michael@0: { michael@0: return 0; michael@0: } michael@0: michael@0: // used as an enumerator by TakeSubTransactions() michael@0: static PLDHashOperator michael@0: TakeStream(nsAHttpTransaction *key, michael@0: nsAutoPtr &stream, michael@0: void *closure) michael@0: { michael@0: nsTArray > *list = michael@0: static_cast > *>(closure); michael@0: michael@0: list->AppendElement(key); michael@0: michael@0: // removing the stream from the hash will delete the stream michael@0: // and drop the transaction reference the hash held michael@0: return PL_DHASH_REMOVE; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::TakeSubTransactions( michael@0: nsTArray > &outTransactions) michael@0: { michael@0: // Generally this cannot be done with spdy as transactions are michael@0: // started right away. michael@0: michael@0: LOG3(("SpdySession3::TakeSubTransactions %p\n", this)); michael@0: michael@0: if (mConcurrentHighWater > 0) michael@0: return NS_ERROR_ALREADY_OPENED; michael@0: michael@0: LOG3((" taking %d\n", mStreamTransactionHash.Count())); michael@0: michael@0: mStreamTransactionHash.Enumerate(TakeStream, &outTransactions); michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::AddTransaction(nsAHttpTransaction *) michael@0: { michael@0: // This API is meant for pipelining, SpdySession3's should be michael@0: // extended with AddStream() michael@0: michael@0: MOZ_ASSERT(false, michael@0: "SpdySession3::AddTransaction() should not be called"); michael@0: michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: uint32_t michael@0: SpdySession3::PipelineDepth() michael@0: { michael@0: return IsDone() ? 0 : 1; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::SetPipelinePosition(int32_t position) michael@0: { michael@0: // This API is meant for pipelining, SpdySession3's should be michael@0: // extended with AddStream() michael@0: michael@0: MOZ_ASSERT(false, michael@0: "SpdySession3::SetPipelinePosition() should not be called"); michael@0: michael@0: return NS_ERROR_NOT_IMPLEMENTED; michael@0: } michael@0: michael@0: int32_t michael@0: SpdySession3::PipelinePosition() michael@0: { michael@0: return 0; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // Pass through methods of nsAHttpConnection michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsAHttpConnection * michael@0: SpdySession3::Connection() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: return mConnection; michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::OnHeadersAvailable(nsAHttpTransaction *transaction, michael@0: nsHttpRequestHead *requestHead, michael@0: nsHttpResponseHead *responseHead, michael@0: bool *reset) michael@0: { michael@0: return mConnection->OnHeadersAvailable(transaction, michael@0: requestHead, michael@0: responseHead, michael@0: reset); michael@0: } michael@0: michael@0: bool michael@0: SpdySession3::IsReused() michael@0: { michael@0: return mConnection->IsReused(); michael@0: } michael@0: michael@0: nsresult michael@0: SpdySession3::PushBack(const char *buf, uint32_t len) michael@0: { michael@0: return mConnection->PushBack(buf, len); michael@0: } michael@0: michael@0: } // namespace mozilla::net michael@0: } // namespace mozilla