netwerk/protocol/http/SpdySession31.cpp

Thu, 15 Jan 2015 21:03:48 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 15 Jan 2015 21:03:48 +0100
branch
TOR_BUG_9701
changeset 11
deefc01c0e14
permissions
-rw-r--r--

Integrate friendly tips from Tor colleagues to make (or not) 4.5 alpha 3;
This includes removal of overloaded (but unused) methods, and addition of
a overlooked call to DataStruct::SetData(nsISupports, uint32_t, bool.)

     1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* vim: set sw=2 ts=8 et tw=80 : */
     3 /* This Source Code Form is subject to the terms of the Mozilla Public
     4  * License, v. 2.0. If a copy of the MPL was not distributed with this
     5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     7 // HttpLog.h should generally be included first
     8 #include "HttpLog.h"
    10 // Log on level :5, instead of default :4.
    11 #undef LOG
    12 #define LOG(args) LOG5(args)
    13 #undef LOG_ENABLED
    14 #define LOG_ENABLED() LOG5_ENABLED()
    16 #include "mozilla/Telemetry.h"
    17 #include "mozilla/Preferences.h"
    18 #include "nsHttp.h"
    19 #include "nsHttpHandler.h"
    20 #include "nsHttpConnection.h"
    21 #include "nsILoadGroup.h"
    22 #include "prprf.h"
    23 #include "prnetdb.h"
    24 #include "SpdyPush31.h"
    25 #include "SpdySession31.h"
    26 #include "SpdyStream31.h"
    27 #include "SpdyZlibReporter.h"
    29 #include <algorithm>
    31 #ifdef DEBUG
    32 // defined by the socket transport service while active
    33 extern PRThread *gSocketThread;
    34 #endif
    36 namespace mozilla {
    37 namespace net {
    39 // SpdySession31 has multiple inheritance of things that implement
    40 // nsISupports, so this magic is taken from nsHttpPipeline that
    41 // implements some of the same abstract classes.
    42 NS_IMPL_ADDREF(SpdySession31)
    43 NS_IMPL_RELEASE(SpdySession31)
    44 NS_INTERFACE_MAP_BEGIN(SpdySession31)
    45 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsAHttpConnection)
    46 NS_INTERFACE_MAP_END
    48 SpdySession31::SpdySession31(nsAHttpTransaction *aHttpTransaction,
    49                              nsISocketTransport *aSocketTransport,
    50                              int32_t firstPriority)
    51   : mSocketTransport(aSocketTransport),
    52   mSegmentReader(nullptr),
    53   mSegmentWriter(nullptr),
    54   mNextStreamID(1),
    55   mConcurrentHighWater(0),
    56   mDownstreamState(BUFFERING_FRAME_HEADER),
    57   mInputFrameBufferSize(kDefaultBufferSize),
    58   mInputFrameBufferUsed(0),
    59   mInputFrameDataLast(false),
    60   mInputFrameDataStream(nullptr),
    61   mNeedsCleanup(nullptr),
    62   mShouldGoAway(false),
    63   mClosed(false),
    64   mCleanShutdown(false),
    65   mDataPending(false),
    66   mGoAwayID(0),
    67   mMaxConcurrent(kDefaultMaxConcurrent),
    68   mConcurrent(0),
    69   mServerPushedResources(0),
    70   mServerInitialStreamWindow(kDefaultRwin),
    71   mLocalSessionWindow(kDefaultRwin),
    72   mRemoteSessionWindow(kDefaultRwin),
    73   mOutputQueueSize(kDefaultQueueSize),
    74   mOutputQueueUsed(0),
    75   mOutputQueueSent(0),
    76   mLastReadEpoch(PR_IntervalNow()),
    77   mPingSentEpoch(0),
    78   mNextPingID(1)
    79 {
    80   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
    82   static uint64_t sSerial;
    83   mSerial = ++sSerial;
    85   LOG3(("SpdySession31::SpdySession31 %p transaction 1 = %p serial=0x%X\n",
    86         this, aHttpTransaction, mSerial));
    88   mConnection = aHttpTransaction->Connection();
    89   mInputFrameBuffer = new char[mInputFrameBufferSize];
    90   mOutputQueueBuffer = new char[mOutputQueueSize];
    91   zlibInit();
    93   mPushAllowance = gHttpHandler->SpdyPushAllowance();
    95   mSendingChunkSize = gHttpHandler->SpdySendingChunkSize();
    96   GenerateSettings();
    98   if (!aHttpTransaction->IsNullTransaction())
    99     AddStream(aHttpTransaction, firstPriority);
   100   mLastDataReadEpoch = mLastReadEpoch;
   102   mPingThreshold = gHttpHandler->SpdyPingThreshold();
   103 }
   105 PLDHashOperator
   106   SpdySession31::ShutdownEnumerator(nsAHttpTransaction *key,
   107                                     nsAutoPtr<SpdyStream31> &stream,
   108                                     void *closure)
   109 {
   110   SpdySession31 *self = static_cast<SpdySession31 *>(closure);
   112   // On a clean server hangup the server sets the GoAwayID to be the ID of
   113   // the last transaction it processed. If the ID of stream in the
   114   // local stream is greater than that it can safely be restarted because the
   115   // server guarantees it was not partially processed. Streams that have not
   116   // registered an ID haven't actually been sent yet so they can always be
   117   // restarted.
   118   if (self->mCleanShutdown &&
   119       (stream->StreamID() > self->mGoAwayID || !stream->HasRegisteredID()))
   120     self->CloseStream(stream, NS_ERROR_NET_RESET); // can be restarted
   121   else
   122     self->CloseStream(stream, NS_ERROR_ABORT);
   124   return PL_DHASH_NEXT;
   125 }
   127 PLDHashOperator
   128 SpdySession31::GoAwayEnumerator(nsAHttpTransaction *key,
   129                                 nsAutoPtr<SpdyStream31> &stream,
   130                                 void *closure)
   131 {
   132   SpdySession31 *self = static_cast<SpdySession31 *>(closure);
   134   // these streams were not processed by the server and can be restarted.
   135   // Do that after the enumerator completes to avoid the risk of
   136   // a restart event re-entrantly modifying this hash. Be sure not to restart
   137   // a pushed (even numbered) stream
   138   if ((stream->StreamID() > self->mGoAwayID && (stream->StreamID() & 1)) ||
   139       !stream->HasRegisteredID()) {
   140     self->mGoAwayStreamsToRestart.Push(stream);
   141   }
   143   return PL_DHASH_NEXT;
   144 }
   146 SpdySession31::~SpdySession31()
   147 {
   148   LOG3(("SpdySession31::~SpdySession31 %p mDownstreamState=%X",
   149         this, mDownstreamState));
   151   inflateEnd(&mDownstreamZlib);
   152   deflateEnd(&mUpstreamZlib);
   154   mStreamTransactionHash.Enumerate(ShutdownEnumerator, this);
   155   Telemetry::Accumulate(Telemetry::SPDY_PARALLEL_STREAMS, mConcurrentHighWater);
   156   Telemetry::Accumulate(Telemetry::SPDY_REQUEST_PER_CONN, (mNextStreamID - 1) / 2);
   157   Telemetry::Accumulate(Telemetry::SPDY_SERVER_INITIATED_STREAMS,
   158                         mServerPushedResources);
   159 }
   161 void
   162 SpdySession31::LogIO(SpdySession31 *self, SpdyStream31 *stream, const char *label,
   163                      const char *data, uint32_t datalen)
   164 {
   165   if (!LOG4_ENABLED())
   166     return;
   168   LOG4(("SpdySession31::LogIO %p stream=%p id=0x%X [%s]",
   169         self, stream, stream ? stream->StreamID() : 0, label));
   171   // Max line is (16 * 3) + 10(prefix) + newline + null
   172   char linebuf[128];
   173   uint32_t index;
   174   char *line = linebuf;
   176   linebuf[127] = 0;
   178   for (index = 0; index < datalen; ++index) {
   179     if (!(index % 16)) {
   180       if (index) {
   181         *line = 0;
   182         LOG4(("%s", linebuf));
   183       }
   184       line = linebuf;
   185       PR_snprintf(line, 128, "%08X: ", index);
   186       line += 10;
   187     }
   188     PR_snprintf(line, 128 - (line - linebuf), "%02X ",
   189                 ((unsigned char *)data)[index]);
   190     line += 3;
   191   }
   192   if (index) {
   193     *line = 0;
   194     LOG4(("%s", linebuf));
   195   }
   196 }
   198 bool
   199 SpdySession31::RoomForMoreConcurrent()
   200 {
   201   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   203   return (mConcurrent < mMaxConcurrent);
   204 }
   206 bool
   207 SpdySession31::RoomForMoreStreams()
   208 {
   209   if (mNextStreamID + mStreamTransactionHash.Count() * 2 > kMaxStreamID)
   210     return false;
   212   return !mShouldGoAway;
   213 }
   215 PRIntervalTime
   216 SpdySession31::IdleTime()
   217 {
   218   return PR_IntervalNow() - mLastDataReadEpoch;
   219 }
   221 uint32_t
   222 SpdySession31::ReadTimeoutTick(PRIntervalTime now)
   223 {
   224   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   225   MOZ_ASSERT(mNextPingID & 1, "Ping Counter Not Odd");
   227   LOG(("SpdySession31::ReadTimeoutTick %p delta since last read %ds\n",
   228        this, PR_IntervalToSeconds(now - mLastReadEpoch)));
   230   if (!mPingThreshold)
   231     return UINT32_MAX;
   233   if ((now - mLastReadEpoch) < mPingThreshold) {
   234     // recent activity means ping is not an issue
   235     if (mPingSentEpoch)
   236       mPingSentEpoch = 0;
   238     return PR_IntervalToSeconds(mPingThreshold) -
   239       PR_IntervalToSeconds(now - mLastReadEpoch);
   240   }
   242   if (mPingSentEpoch) {
   243     LOG(("SpdySession31::ReadTimeoutTick %p handle outstanding ping\n", this));
   244     if ((now - mPingSentEpoch) >= gHttpHandler->SpdyPingTimeout()) {
   245       LOG(("SpdySession31::ReadTimeoutTick %p Ping Timer Exhaustion\n",
   246            this));
   247       mPingSentEpoch = 0;
   248       Close(NS_ERROR_NET_TIMEOUT);
   249       return UINT32_MAX;
   250     }
   251     return 1; // run the tick aggressively while ping is outstanding
   252   }
   254   LOG(("SpdySession31::ReadTimeoutTick %p generating ping 0x%X\n",
   255        this, mNextPingID));
   257   if (mNextPingID == 0xffffffff) {
   258     LOG(("SpdySession31::ReadTimeoutTick %p cannot form ping - ids exhausted\n",
   259          this));
   260     return UINT32_MAX;
   261   }
   263   mPingSentEpoch = PR_IntervalNow();
   264   if (!mPingSentEpoch)
   265     mPingSentEpoch = 1; // avoid the 0 sentinel value
   266   GeneratePing(mNextPingID);
   267   mNextPingID += 2;
   268   ResumeRecv(); // read the ping reply
   270   // Check for orphaned push streams. This looks expensive, but generally the
   271   // list is empty.
   272   SpdyPushedStream31 *deleteMe;
   273   TimeStamp timestampNow;
   274   do {
   275     deleteMe = nullptr;
   277     for (uint32_t index = mPushedStreams.Length();
   278          index > 0 ; --index) {
   279       SpdyPushedStream31 *pushedStream = mPushedStreams[index - 1];
   281       if (timestampNow.IsNull())
   282         timestampNow = TimeStamp::Now(); // lazy initializer
   284       // if spdy finished, but not connected, and its been like that for too long..
   285       // cleanup the stream..
   286       if (pushedStream->IsOrphaned(timestampNow))
   287       {
   288         LOG3(("SpdySession31 Timeout Pushed Stream %p 0x%X\n",
   289               this, pushedStream->StreamID()));
   290         deleteMe = pushedStream;
   291         break; // don't CleanupStream() while iterating this vector
   292       }
   293     }
   294     if (deleteMe)
   295       CleanupStream(deleteMe, NS_ERROR_ABORT, RST_CANCEL);
   297   } while (deleteMe);
   299   if (mNextPingID == 0xffffffff) {
   300     LOG(("SpdySession31::ReadTimeoutTick %p "
   301          "ping ids exhausted marking goaway\n", this));
   302     mShouldGoAway = true;
   303   }
   304   return 1; // run the tick aggressively while ping is outstanding
   305 }
   307 uint32_t
   308 SpdySession31::RegisterStreamID(SpdyStream31 *stream, uint32_t aNewID)
   309 {
   310   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   312   MOZ_ASSERT(mNextStreamID < 0xfffffff0,
   313              "should have stopped admitting streams");
   315   MOZ_ASSERT(!(aNewID & 1),
   316              "0 for autoassign pull, otherwise explicit even push assignment");
   317   if (!aNewID) {
   318     // auto generate a new pull stream ID
   319     aNewID = mNextStreamID;
   320     MOZ_ASSERT(aNewID & 1, "pull ID must be odd.");
   321     mNextStreamID += 2;
   322   }
   324   LOG3(("SpdySession31::RegisterStreamID session=%p stream=%p id=0x%X "
   325         "concurrent=%d",this, stream, aNewID, mConcurrent));
   327   // We've used up plenty of ID's on this session. Start
   328   // moving to a new one before there is a crunch involving
   329   // server push streams or concurrent non-registered submits
   330   if (aNewID >= kMaxStreamID)
   331     mShouldGoAway = true;
   333   // integrity check
   334   if (mStreamIDHash.Get(aNewID)) {
   335     LOG3(("   New ID already present\n"));
   336     MOZ_ASSERT(false, "New ID already present in mStreamIDHash");
   337     mShouldGoAway = true;
   338     return kDeadStreamID;
   339   }
   341   mStreamIDHash.Put(aNewID, stream);
   342   return aNewID;
   343 }
   345 bool
   346 SpdySession31::AddStream(nsAHttpTransaction *aHttpTransaction,
   347                            int32_t aPriority)
   348 {
   349   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   351   // integrity check
   352   if (mStreamTransactionHash.Get(aHttpTransaction)) {
   353     LOG3(("   New transaction already present\n"));
   354     MOZ_ASSERT(false, "AddStream duplicate transaction pointer");
   355     return false;
   356   }
   358   aHttpTransaction->SetConnection(this);
   359   SpdyStream31 *stream = new SpdyStream31(aHttpTransaction, this, aPriority);
   361   LOG3(("SpdySession31::AddStream session=%p stream=%p NextID=0x%X (tentative)",
   362         this, stream, mNextStreamID));
   364   mStreamTransactionHash.Put(aHttpTransaction, stream);
   366   if (RoomForMoreConcurrent()) {
   367     LOG3(("SpdySession31::AddStream %p stream %p activated immediately.",
   368           this, stream));
   369     ActivateStream(stream);
   370   }
   371   else {
   372     LOG3(("SpdySession31::AddStream %p stream %p queued.", this, stream));
   373     mQueuedStreams.Push(stream);
   374   }
   376   if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE)) {
   377     LOG3(("SpdySession31::AddStream %p transaction %p forces keep-alive off.\n",
   378           this, aHttpTransaction));
   379     DontReuse();
   380   }
   382   return true;
   383 }
   385 void
   386 SpdySession31::ActivateStream(SpdyStream31 *stream)
   387 {
   388   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   389   MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
   390              "Do not activate pushed streams");
   392   ++mConcurrent;
   393   if (mConcurrent > mConcurrentHighWater)
   394     mConcurrentHighWater = mConcurrent;
   395   LOG3(("SpdySession31::AddStream %p activating stream %p Currently %d "
   396         "streams in session, high water mark is %d",
   397         this, stream, mConcurrent, mConcurrentHighWater));
   399   mReadyForWrite.Push(stream);
   400   SetWriteCallbacks();
   402   // Kick off the SYN transmit without waiting for the poll loop
   403   // This won't work for stream id=1 because there is no segment reader
   404   // yet.
   405   if (mSegmentReader) {
   406     uint32_t countRead;
   407     ReadSegments(nullptr, kDefaultBufferSize, &countRead);
   408   }
   409 }
   411 void
   412 SpdySession31::ProcessPending()
   413 {
   414   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   416   while (RoomForMoreConcurrent()) {
   417     SpdyStream31 *stream = static_cast<SpdyStream31 *>(mQueuedStreams.PopFront());
   418     if (!stream)
   419       return;
   420     LOG3(("SpdySession31::ProcessPending %p stream %p activated from queue.",
   421           this, stream));
   422     ActivateStream(stream);
   423   }
   424 }
   426 nsresult
   427 SpdySession31::NetworkRead(nsAHttpSegmentWriter *writer, char *buf,
   428                            uint32_t count, uint32_t *countWritten)
   429 {
   430   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   432   if (!count) {
   433     *countWritten = 0;
   434     return NS_OK;
   435   }
   437   nsresult rv = writer->OnWriteSegment(buf, count, countWritten);
   438   if (NS_SUCCEEDED(rv) && *countWritten > 0)
   439     mLastReadEpoch = PR_IntervalNow();
   440   return rv;
   441 }
   443 void
   444 SpdySession31::SetWriteCallbacks()
   445 {
   446   if (mConnection && (GetWriteQueueSize() || mOutputQueueUsed))
   447     mConnection->ResumeSend();
   448 }
   450 void
   451 SpdySession31::RealignOutputQueue()
   452 {
   453   mOutputQueueUsed -= mOutputQueueSent;
   454   memmove(mOutputQueueBuffer.get(),
   455           mOutputQueueBuffer.get() + mOutputQueueSent,
   456           mOutputQueueUsed);
   457   mOutputQueueSent = 0;
   458 }
   460 void
   461 SpdySession31::FlushOutputQueue()
   462 {
   463   if (!mSegmentReader || !mOutputQueueUsed)
   464     return;
   466   nsresult rv;
   467   uint32_t countRead;
   468   uint32_t avail = mOutputQueueUsed - mOutputQueueSent;
   470   rv = mSegmentReader->
   471     OnReadSegment(mOutputQueueBuffer.get() + mOutputQueueSent, avail,
   472                   &countRead);
   473   LOG3(("SpdySession31::FlushOutputQueue %p sz=%d rv=%x actual=%d",
   474         this, avail, rv, countRead));
   476   // Dont worry about errors on write, we will pick this up as a read error too
   477   if (NS_FAILED(rv))
   478     return;
   480   if (countRead == avail) {
   481     mOutputQueueUsed = 0;
   482     mOutputQueueSent = 0;
   483     return;
   484   }
   486   mOutputQueueSent += countRead;
   488   // If the output queue is close to filling up and we have sent out a good
   489   // chunk of data from the beginning then realign it.
   491   if ((mOutputQueueSent >= kQueueMinimumCleanup) &&
   492       ((mOutputQueueSize - mOutputQueueUsed) < kQueueTailRoom)) {
   493     RealignOutputQueue();
   494   }
   495 }
   497 void
   498 SpdySession31::DontReuse()
   499 {
   500   mShouldGoAway = true;
   501   if (!mStreamTransactionHash.Count())
   502     Close(NS_OK);
   503 }
   505 uint32_t
   506 SpdySession31::GetWriteQueueSize()
   507 {
   508   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   510   return mReadyForWrite.GetSize();
   511 }
   513 void
   514 SpdySession31::ChangeDownstreamState(enum stateType newState)
   515 {
   516   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   518   LOG3(("SpdyStream31::ChangeDownstreamState() %p from %X to %X",
   519         this, mDownstreamState, newState));
   520   mDownstreamState = newState;
   521 }
   523 void
   524 SpdySession31::ResetDownstreamState()
   525 {
   526   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   528   LOG3(("SpdyStream31::ResetDownstreamState() %p", this));
   529   ChangeDownstreamState(BUFFERING_FRAME_HEADER);
   531   if (mInputFrameDataLast && mInputFrameDataStream) {
   532     mInputFrameDataLast = false;
   533     if (!mInputFrameDataStream->RecvdFin()) {
   534       LOG3(("  SetRecvdFin id=0x%x\n", mInputFrameDataStream->StreamID()));
   535       mInputFrameDataStream->SetRecvdFin(true);
   536       DecrementConcurrent(mInputFrameDataStream);
   537     }
   538   }
   539   mInputFrameBufferUsed = 0;
   540   mInputFrameDataStream = nullptr;
   541 }
   543 template<typename T> void
   544 SpdySession31::EnsureBuffer(nsAutoArrayPtr<T> &buf,
   545                             uint32_t newSize,
   546                             uint32_t preserve,
   547                             uint32_t &objSize)
   548 {
   549   if (objSize >= newSize)
   550     return;
   552   // Leave a little slop on the new allocation - add 2KB to
   553   // what we need and then round the result up to a 4KB (page)
   554   // boundary.
   556   objSize = (newSize + 2048 + 4095) & ~4095;
   558   static_assert(sizeof(T) == 1, "sizeof(T) must be 1");
   559   nsAutoArrayPtr<T> tmp(new T[objSize]);
   560   memcpy(tmp, buf, preserve);
   561   buf = tmp;
   562 }
   564 // Instantiate supported templates explicitly.
   565 template void
   566 SpdySession31::EnsureBuffer(nsAutoArrayPtr<char> &buf,
   567                             uint32_t newSize,
   568                             uint32_t preserve,
   569                             uint32_t &objSize);
   571 template void
   572 SpdySession31::EnsureBuffer(nsAutoArrayPtr<uint8_t> &buf,
   573                             uint32_t newSize,
   574                             uint32_t preserve,
   575                             uint32_t &objSize);
   577 void
   578 SpdySession31::DecrementConcurrent(SpdyStream31 *aStream)
   579 {
   580   uint32_t id = aStream->StreamID();
   582   if (id && !(id & 0x1))
   583     return; // pushed streams aren't counted in concurrent limit
   585   MOZ_ASSERT(mConcurrent);
   586   --mConcurrent;
   587   LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n",
   588         this, id, mConcurrent));
   589   ProcessPending();
   590 }
   592 void
   593 SpdySession31::zlibInit()
   594 {
   595   mDownstreamZlib.zalloc = SpdyZlibReporter::Alloc;
   596   mDownstreamZlib.zfree = SpdyZlibReporter::Free;
   597   mDownstreamZlib.opaque = Z_NULL;
   599   inflateInit(&mDownstreamZlib);
   601   mUpstreamZlib.zalloc = SpdyZlibReporter::Alloc;
   602   mUpstreamZlib.zfree = SpdyZlibReporter::Free;
   603   mUpstreamZlib.opaque = Z_NULL;
   605   // mixing carte blanche compression with tls subjects us to traffic
   606   // analysis attacks
   607   deflateInit(&mUpstreamZlib, Z_NO_COMPRESSION);
   608   deflateSetDictionary(&mUpstreamZlib,
   609                        SpdyStream31::kDictionary,
   610                        sizeof(SpdyStream31::kDictionary));
   611 }
   613 // Need to decompress some data in order to keep the compression
   614 // context correct, but we really don't care what the result is
   615 nsresult
   616 SpdySession31::UncompressAndDiscard(uint32_t offset,
   617                                     uint32_t blockLen)
   618 {
   619   char *blockStart = mInputFrameBuffer + offset;
   620   unsigned char trash[2048];
   621   mDownstreamZlib.avail_in = blockLen;
   622   mDownstreamZlib.next_in = reinterpret_cast<unsigned char *>(blockStart);
   623   bool triedDictionary = false;
   625   do {
   626     mDownstreamZlib.next_out = trash;
   627     mDownstreamZlib.avail_out = sizeof(trash);
   628     int zlib_rv = inflate(&mDownstreamZlib, Z_NO_FLUSH);
   630     if (zlib_rv == Z_NEED_DICT) {
   631       if (triedDictionary) {
   632         LOG3(("SpdySession31::UncompressAndDiscard %p Dictionary Error\n", this));
   633         return NS_ERROR_ILLEGAL_VALUE;
   634       }
   636       triedDictionary = true;
   637       inflateSetDictionary(&mDownstreamZlib, SpdyStream31::kDictionary,
   638                            sizeof(SpdyStream31::kDictionary));
   639     }
   641     if (zlib_rv == Z_DATA_ERROR)
   642       return NS_ERROR_ILLEGAL_VALUE;
   644     if (zlib_rv == Z_MEM_ERROR)
   645       return NS_ERROR_FAILURE;
   646   }
   647   while (mDownstreamZlib.avail_in);
   648   return NS_OK;
   649 }
   651 void
   652 SpdySession31::GeneratePing(uint32_t aID)
   653 {
   654   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   655   LOG3(("SpdySession31::GeneratePing %p 0x%X\n", this, aID));
   657   EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 12,
   658                mOutputQueueUsed, mOutputQueueSize);
   659   char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
   660   mOutputQueueUsed += 12;
   662   packet[0] = kFlag_Control;
   663   packet[1] = kVersion;
   664   packet[2] = 0;
   665   packet[3] = CONTROL_TYPE_PING;
   666   packet[4] = 0;                                  /* flags */
   667   packet[5] = 0;
   668   packet[6] = 0;
   669   packet[7] = 4;                                  /* length */
   671   aID = PR_htonl(aID);
   672   memcpy(packet + 8, &aID, 4);
   674   LogIO(this, nullptr, "Generate Ping", packet, 12);
   675   FlushOutputQueue();
   676 }
   678 void
   679 SpdySession31::GenerateRstStream(uint32_t aStatusCode, uint32_t aID)
   680 {
   681   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   682   LOG3(("SpdySession31::GenerateRst %p 0x%X %d\n", this, aID, aStatusCode));
   684   EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 16,
   685                mOutputQueueUsed, mOutputQueueSize);
   686   char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
   687   mOutputQueueUsed += 16;
   689   packet[0] = kFlag_Control;
   690   packet[1] = kVersion;
   691   packet[2] = 0;
   692   packet[3] = CONTROL_TYPE_RST_STREAM;
   693   packet[4] = 0;                                  /* flags */
   694   packet[5] = 0;
   695   packet[6] = 0;
   696   packet[7] = 8;                                  /* length */
   698   aID = PR_htonl(aID);
   699   memcpy(packet + 8, &aID, 4);
   700   aStatusCode = PR_htonl(aStatusCode);
   701   memcpy(packet + 12, &aStatusCode, 4);
   703   LogIO(this, nullptr, "Generate Reset", packet, 16);
   704   FlushOutputQueue();
   705 }
   707 void
   708 SpdySession31::GenerateGoAway(uint32_t aStatusCode)
   709 {
   710   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   711   LOG3(("SpdySession31::GenerateGoAway %p code=%X\n", this, aStatusCode));
   713   EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 16,
   714                mOutputQueueUsed, mOutputQueueSize);
   715   char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
   716   mOutputQueueUsed += 16;
   718   memset(packet, 0, 16);
   719   packet[0] = kFlag_Control;
   720   packet[1] = kVersion;
   721   packet[3] = CONTROL_TYPE_GOAWAY;
   722   packet[7] = 8;                                  /* data length */
   724   // last-good-stream-id are bytes 8-11, when we accept server push this will
   725   // need to be set non zero
   727   // bytes 12-15 are the status code.
   728   aStatusCode = PR_htonl(aStatusCode);
   729   memcpy(packet + 12, &aStatusCode, 4);
   731   LogIO(this, nullptr, "Generate GoAway", packet, 16);
   732   FlushOutputQueue();
   733 }
   735 void
   736 SpdySession31::GenerateSettings()
   737 {
   738   uint32_t sessionWindowBump = ASpdySession::kInitialRwin - kDefaultRwin;
   739   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   740   LOG3(("SpdySession31::GenerateSettings %p\n", this));
   742 // sized for 3 settings and a session window update to follow
   743   static const uint32_t maxDataLen = 4 + 3 * 8 + 16;
   744   EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + 8 + maxDataLen,
   745                mOutputQueueUsed, mOutputQueueSize);
   746   char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
   748   memset(packet, 0, 8 + maxDataLen);
   749   packet[0] = kFlag_Control;
   750   packet[1] = kVersion;
   751   packet[3] = CONTROL_TYPE_SETTINGS;
   753   uint8_t numberOfEntries = 0;
   755   // entries need to be listed in order by ID
   756   // 1st entry is bytes 12 to 19
   757   // 2nd entry is bytes 20 to 27
   758   // 3rd entry is bytes 28 to 35
   760   if (!gHttpHandler->AllowPush()) {
   761     // announcing that we accept 0 incoming streams is done to
   762     // disable server push
   763     packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_MAX_CONCURRENT;
   764     // The value portion of the setting pair is already initialized to 0
   765     numberOfEntries++;
   766   }
   768   nsRefPtr<nsHttpConnectionInfo> ci;
   769   uint32_t cwnd = 0;
   770   GetConnectionInfo(getter_AddRefs(ci));
   771   if (ci)
   772     cwnd = gHttpHandler->ConnMgr()->GetSpdyCWNDSetting(ci);
   773   if (cwnd) {
   774     packet[12 + 8 * numberOfEntries] = PERSISTED_VALUE;
   775     packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_CWND;
   776     LOG(("SpdySession31::GenerateSettings %p sending CWND %u\n", this, cwnd));
   777     cwnd = PR_htonl(cwnd);
   778     memcpy(packet + 16 + 8 * numberOfEntries, &cwnd, 4);
   779     numberOfEntries++;
   780   }
   782   // Advertise the Push RWIN and on each client SYN_STREAM pipeline
   783   // a window update with it in order to use larger initial windows with pulled
   784   // streams.
   785   packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_INITIAL_WINDOW;
   786   uint32_t rwin = PR_htonl(mPushAllowance);
   787   memcpy(packet + 16 + 8 * numberOfEntries, &rwin, 4);
   788   numberOfEntries++;
   790   uint32_t dataLen = 4 + 8 * numberOfEntries;
   791   mOutputQueueUsed += 8 + dataLen;
   792   packet[7] = dataLen;
   793   packet[11] = numberOfEntries;
   795   LogIO(this, nullptr, "Generate Settings", packet, 8 + dataLen);
   797   if (kDefaultRwin >= ASpdySession::kInitialRwin)
   798     goto generateSettings_complete;
   800   // send a window update for the session (Stream 0) for something large
   801   sessionWindowBump = PR_htonl(sessionWindowBump);
   802   mLocalSessionWindow = ASpdySession::kInitialRwin;
   804   packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
   805   mOutputQueueUsed += 16;
   807   packet[0] = kFlag_Control;
   808   packet[1] = kVersion;
   809   packet[3] = CONTROL_TYPE_WINDOW_UPDATE;
   810   packet[7] = 8; // 8 data bytes after 8 byte header
   812   // 8 to 11 stay 0 bytes for id = 0
   813   memcpy(packet + 12, &sessionWindowBump, 4);
   815   LOG3(("Session Window increase at start of session %p %u\n",
   816         this, PR_ntohl(sessionWindowBump)));
   817   LogIO(this, nullptr, "Session Window Bump ", packet, 16);
   819 generateSettings_complete:
   820   FlushOutputQueue();
   821 }
   823 // perform a bunch of integrity checks on the stream.
   824 // returns true if passed, false (plus LOG and ABORT) if failed.
   825 bool
   826 SpdySession31::VerifyStream(SpdyStream31 *aStream, uint32_t aOptionalID = 0)
   827 {
   828   // This is annoying, but at least it is O(1)
   829   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   831 #ifndef DEBUG
   832   // Only do the real verification in debug builds
   833   return true;
   834 #endif
   836   if (!aStream)
   837     return true;
   839   uint32_t test = 0;
   841   do {
   842     if (aStream->StreamID() == kDeadStreamID)
   843       break;
   845     nsAHttpTransaction *trans = aStream->Transaction();
   847     test++;
   848     if (!trans)
   849       break;
   851     test++;
   852     if (mStreamTransactionHash.Get(trans) != aStream)
   853       break;
   855     if (aStream->StreamID()) {
   856       SpdyStream31 *idStream = mStreamIDHash.Get(aStream->StreamID());
   858       test++;
   859       if (idStream != aStream)
   860         break;
   862       if (aOptionalID) {
   863         test++;
   864         if (idStream->StreamID() != aOptionalID)
   865           break;
   866       }
   867     }
   869     // tests passed
   870     return true;
   871   } while (0);
   873   LOG(("SpdySession31 %p VerifyStream Failure %p stream->id=0x%X "
   874        "optionalID=0x%X trans=%p test=%d\n",
   875        this, aStream, aStream->StreamID(),
   876        aOptionalID, aStream->Transaction(), test));
   878   MOZ_ASSERT(false, "VerifyStream");
   879   return false;
   880 }
   882 void
   883 SpdySession31::CleanupStream(SpdyStream31 *aStream, nsresult aResult,
   884                              rstReason aResetCode)
   885 {
   886   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   887   LOG3(("SpdySession31::CleanupStream %p %p 0x%X %X\n",
   888         this, aStream, aStream ? aStream->StreamID() : 0, aResult));
   889   if (!aStream) {
   890     return;
   891   }
   893   SpdyPushedStream31 *pushSource = nullptr;
   895   if (NS_SUCCEEDED(aResult) && aStream->DeferCleanupOnSuccess()) {
   896     LOG(("SpdySession31::CleanupStream 0x%X deferred\n", aStream->StreamID()));
   897     return;
   898   }
   900   if (!VerifyStream(aStream)) {
   901     LOG(("SpdySession31::CleanupStream failed to verify stream\n"));
   902     return;
   903   }
   905   pushSource = aStream->PushSource();
   907   if (!aStream->RecvdFin() && aStream->StreamID()) {
   908     LOG3(("Stream had not processed recv FIN, sending RST code %X\n",
   909           aResetCode));
   910     GenerateRstStream(aResetCode, aStream->StreamID());
   911     DecrementConcurrent(aStream);
   912   }
   914   CloseStream(aStream, aResult);
   916   // Remove the stream from the ID hash table and, if an even id, the pushed
   917   // table too.
   918   uint32_t id = aStream->StreamID();
   919   if (id > 0) {
   920     mStreamIDHash.Remove(id);
   921     if (!(id & 1))
   922       mPushedStreams.RemoveElement(aStream);
   923   }
   925   RemoveStreamFromQueues(aStream);
   927   // removing from the stream transaction hash will
   928   // delete the SpdyStream31 and drop the reference to
   929   // its transaction
   930   mStreamTransactionHash.Remove(aStream->Transaction());
   932   if (mShouldGoAway && !mStreamTransactionHash.Count())
   933     Close(NS_OK);
   935   if (pushSource) {
   936     pushSource->SetDeferCleanupOnSuccess(false);
   937     CleanupStream(pushSource, aResult, aResetCode);
   938   }
   939 }
   941 static void RemoveStreamFromQueue(SpdyStream31 *aStream, nsDeque &queue)
   942 {
   943   uint32_t size = queue.GetSize();
   944   for (uint32_t count = 0; count < size; ++count) {
   945     SpdyStream31 *stream = static_cast<SpdyStream31 *>(queue.PopFront());
   946     if (stream != aStream)
   947       queue.Push(stream);
   948   }
   949 }
   951 void
   952 SpdySession31::RemoveStreamFromQueues(SpdyStream31 *aStream)
   953 {
   954   RemoveStreamFromQueue(aStream, mReadyForWrite);
   955   RemoveStreamFromQueue(aStream, mQueuedStreams);
   956   RemoveStreamFromQueue(aStream, mReadyForRead);
   957 }
   959 void
   960 SpdySession31::CloseStream(SpdyStream31 *aStream, nsresult aResult)
   961 {
   962   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
   963   LOG3(("SpdySession31::CloseStream %p %p 0x%x %X\n",
   964         this, aStream, aStream->StreamID(), aResult));
   966   // Check if partial frame reader
   967   if (aStream == mInputFrameDataStream) {
   968     LOG3(("Stream had active partial read frame on close"));
   969     ChangeDownstreamState(DISCARDING_DATA_FRAME);
   970     mInputFrameDataStream = nullptr;
   971   }
   973   RemoveStreamFromQueues(aStream);
   975   // Send the stream the close() indication
   976   aStream->Close(aResult);
   977 }
   979 nsresult
   980 SpdySession31::HandleSynStream(SpdySession31 *self)
   981 {
   982   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_SYN_STREAM);
   984   if (self->mInputFrameDataSize < 18) {
   985     LOG3(("SpdySession31::HandleSynStream %p SYN_STREAM too short data=%d",
   986           self, self->mInputFrameDataSize));
   987     return NS_ERROR_ILLEGAL_VALUE;
   988   }
   990   uint32_t streamID =
   991     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
   992   uint32_t associatedID =
   993     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[3]);
   994   uint8_t flags = reinterpret_cast<uint8_t *>(self->mInputFrameBuffer.get())[4];
   996   LOG3(("SpdySession31::HandleSynStream %p recv SYN_STREAM (push) "
   997         "for ID 0x%X associated with 0x%X.\n",
   998         self, streamID, associatedID));
  1000   if (streamID & 0x01) {                   // test for odd stream ID
  1001     LOG3(("SpdySession31::HandleSynStream %p recvd SYN_STREAM id must be even.",
  1002           self));
  1003     return NS_ERROR_ILLEGAL_VALUE;
  1006   // confirm associated-to
  1007   nsresult rv = self->SetInputFrameDataStream(associatedID);
  1008   if (NS_FAILED(rv))
  1009     return rv;
  1010   SpdyStream31 *associatedStream = self->mInputFrameDataStream;
  1012   ++(self->mServerPushedResources);
  1014   // Anytime we start using the high bit of stream ID (either client or server)
  1015   // begin to migrate to a new session.
  1016   if (streamID >= kMaxStreamID)
  1017     self->mShouldGoAway = true;
  1019   bool resetStream = true;
  1020   SpdyPushCache *cache = nullptr;
  1022   if (!(flags & kFlag_Data_UNI)) {
  1023     // pushed streams require UNIDIRECTIONAL flag
  1024     LOG3(("SpdySession31::HandleSynStream %p ID %0x%X associated ID 0x%X failed.\n",
  1025           self, streamID, associatedID));
  1026     self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID);
  1028   } else if (!associatedID) {
  1029     // associated stream 0 will never find a match, but the spec requires a
  1030     // PROTOCOL_ERROR in this specific case
  1031     LOG3(("SpdySession31::HandleSynStream %p associated ID of 0 failed.\n", self));
  1032     self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID);
  1034   } else if (!gHttpHandler->AllowPush()) {
  1035     // MAX_CONCURRENT_STREAMS of 0 in settings should have disabled push,
  1036     // but some servers are buggy about that.. or the config could have
  1037     // been updated after the settings frame was sent. In both cases just
  1038     // reject the pushed stream as refused
  1039     LOG3(("SpdySession31::HandleSynStream Push Recevied when Disabled\n"));
  1040     self->GenerateRstStream(RST_REFUSED_STREAM, streamID);
  1042   } else if (!associatedStream) {
  1043     LOG3(("SpdySession31::HandleSynStream %p lookup associated ID failed.\n", self));
  1044     self->GenerateRstStream(RST_INVALID_STREAM, streamID);
  1046   } else {
  1047     nsILoadGroupConnectionInfo *loadGroupCI = associatedStream->LoadGroupConnectionInfo();
  1048     if (loadGroupCI) {
  1049       loadGroupCI->GetSpdyPushCache(&cache);
  1050       if (!cache) {
  1051         cache = new SpdyPushCache();
  1052         if (!cache || NS_FAILED(loadGroupCI->SetSpdyPushCache(cache))) {
  1053           delete cache;
  1054           cache = nullptr;
  1058     if (!cache) {
  1059       // this is unexpected, but we can handle it just be refusing the push
  1060       LOG3(("SpdySession31::HandleSynStream Push Recevied without loadgroup cache\n"));
  1061       self->GenerateRstStream(RST_REFUSED_STREAM, streamID);
  1063     else {
  1064       resetStream = false;
  1068   if (resetStream) {
  1069     // Need to decompress the headers even though we aren't using them yet in
  1070     // order to keep the compression context consistent for other syn_reply frames
  1071     rv = self->UncompressAndDiscard(18, self->mInputFrameDataSize - 10);
  1072     if (NS_FAILED(rv)) {
  1073       LOG(("SpdySession31::HandleSynStream uncompress failed\n"));
  1074       return rv;
  1076     self->ResetDownstreamState();
  1077     return NS_OK;
  1080   // Create the buffering transaction and push stream
  1081   nsRefPtr<SpdyPush31TransactionBuffer> transactionBuffer =
  1082     new SpdyPush31TransactionBuffer();
  1083   transactionBuffer->SetConnection(self);
  1084   SpdyPushedStream31 *pushedStream =
  1085     new SpdyPushedStream31(transactionBuffer, self,
  1086                            associatedStream, streamID);
  1088   // ownership of the pushed stream is by the transaction hash, just as it
  1089   // is for a client initiated stream. Errors that aren't fatal to the
  1090   // whole session must call cleanupStream() after this point in order
  1091   // to remove the stream from that hash.
  1092   self->mStreamTransactionHash.Put(transactionBuffer, pushedStream);
  1093   self->mPushedStreams.AppendElement(pushedStream);
  1095   // The pushed stream is unidirectional so it is fully open immediately
  1096   pushedStream->SetFullyOpen();
  1098   // Uncompress the response headers into a stream specific buffer, leaving them
  1099   // in spdy format for the time being.
  1100   rv = pushedStream->Uncompress(&self->mDownstreamZlib,
  1101                                 self->mInputFrameBuffer + 18,
  1102                                 self->mInputFrameDataSize - 10);
  1103   if (NS_FAILED(rv)) {
  1104     LOG(("SpdySession31::HandleSynStream uncompress failed\n"));
  1105     return rv;
  1108   if (self->RegisterStreamID(pushedStream, streamID) == kDeadStreamID) {
  1109     LOG(("SpdySession31::HandleSynStream registerstreamid failed\n"));
  1110     return NS_ERROR_FAILURE;
  1113   // Fake the request side of the pushed HTTP transaction. Sets up hash
  1114   // key and origin
  1115   uint32_t notUsed;
  1116   pushedStream->ReadSegments(nullptr, 1, &notUsed);
  1118   nsAutoCString key;
  1119   if (!pushedStream->GetHashKey(key)) {
  1120     LOG(("SpdySession31::HandleSynStream one of :host :scheme :path missing from push\n"));
  1121     self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM);
  1122     self->ResetDownstreamState();
  1123     return NS_OK;
  1126   if (!associatedStream->Origin().Equals(pushedStream->Origin())) {
  1127     LOG(("SpdySession31::HandleSynStream pushed stream mismatched origin\n"));
  1128     self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM);
  1129     self->ResetDownstreamState();
  1130     return NS_OK;
  1133   if (!cache->RegisterPushedStreamSpdy31(key, pushedStream)) {
  1134     LOG(("SpdySession31::HandleSynStream registerPushedStream Failed\n"));
  1135     self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM);
  1136     self->ResetDownstreamState();
  1137     return NS_OK;
  1140   self->ResetDownstreamState();
  1141   return NS_OK;
  1144 nsresult
  1145 SpdySession31::SetInputFrameDataStream(uint32_t streamID)
  1147   mInputFrameDataStream = mStreamIDHash.Get(streamID);
  1148   if (VerifyStream(mInputFrameDataStream, streamID))
  1149     return NS_OK;
  1151   LOG(("SpdySession31::SetInputFrameDataStream failed to verify 0x%X\n",
  1152        streamID));
  1153   mInputFrameDataStream = nullptr;
  1154   return NS_ERROR_UNEXPECTED;
  1157 nsresult
  1158 SpdySession31::HandleSynReply(SpdySession31 *self)
  1160   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_SYN_REPLY);
  1162   if (self->mInputFrameDataSize < 4) {
  1163     LOG3(("SpdySession31::HandleSynReply %p SYN REPLY too short data=%d",
  1164           self, self->mInputFrameDataSize));
  1165     // A framing error is a session wide error that cannot be recovered
  1166     return NS_ERROR_ILLEGAL_VALUE;
  1169   LOG3(("SpdySession31::HandleSynReply %p lookup via streamID in syn_reply.\n",
  1170         self));
  1171   uint32_t streamID =
  1172     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1173   nsresult rv = self->SetInputFrameDataStream(streamID);
  1174   if (NS_FAILED(rv))
  1175     return rv;
  1177   if (!self->mInputFrameDataStream) {
  1178     // Cannot find stream. We can continue the SPDY session, but we need to
  1179     // uncompress the header block to maintain the correct compression context
  1181     LOG3(("SpdySession31::HandleSynReply %p lookup streamID in syn_reply "
  1182           "0x%X failed. NextStreamID = 0x%X\n",
  1183           self, streamID, self->mNextStreamID));
  1185     if (streamID >= self->mNextStreamID)
  1186       self->GenerateRstStream(RST_INVALID_STREAM, streamID);
  1188     rv = self->UncompressAndDiscard(12, self->mInputFrameDataSize - 4);
  1189     if (NS_FAILED(rv)) {
  1190       LOG(("SpdySession31::HandleSynReply uncompress failed\n"));
  1191       // this is fatal to the session
  1192       return rv;
  1195     self->ResetDownstreamState();
  1196     return NS_OK;
  1199   // Uncompress the headers into a stream specific buffer, leaving them in
  1200   // spdy format for the time being. Make certain to do this
  1201   // step before any error handling that might abort the stream but not
  1202   // the session becuase the session compression context will become
  1203   // inconsistent if all of the compressed data is not processed.
  1204   rv = self->mInputFrameDataStream->Uncompress(&self->mDownstreamZlib,
  1205                                                self->mInputFrameBuffer + 12,
  1206                                                self->mInputFrameDataSize - 4);
  1208   if (NS_FAILED(rv)) {
  1209     LOG(("SpdySession31::HandleSynReply uncompress failed\n"));
  1210     return rv;
  1213   if (self->mInputFrameDataStream->GetFullyOpen()) {
  1214     // "If an endpoint receives multiple SYN_REPLY frames for the same active
  1215     // stream ID, it MUST issue a stream error (Section 2.4.2) with the error
  1216     // code STREAM_IN_USE."
  1217     //
  1218     // "STREAM_ALREADY_CLOSED. The endpoint received a data or SYN_REPLY
  1219     // frame for a stream which is half closed."
  1220     //
  1221     // If the stream is open then just RST_STREAM with STREAM_IN_USE
  1222     // If the stream is half closed then RST_STREAM with STREAM_ALREADY_CLOSED
  1223     // abort the session
  1224     //
  1225     LOG3(("SpdySession31::HandleSynReply %p dup SYN_REPLY for 0x%X"
  1226           " recvdfin=%d", self, self->mInputFrameDataStream->StreamID(),
  1227           self->mInputFrameDataStream->RecvdFin()));
  1229     self->CleanupStream(self->mInputFrameDataStream, NS_ERROR_ALREADY_OPENED,
  1230                         self->mInputFrameDataStream->RecvdFin() ?
  1231                         RST_STREAM_ALREADY_CLOSED : RST_STREAM_IN_USE);
  1232     self->ResetDownstreamState();
  1233     return NS_OK;
  1235   self->mInputFrameDataStream->SetFullyOpen();
  1237   self->mInputFrameDataLast = self->mInputFrameBuffer[4] & kFlag_Data_FIN;
  1238   self->mInputFrameDataStream->UpdateTransportReadEvents(self->mInputFrameDataSize);
  1239   self->mLastDataReadEpoch = self->mLastReadEpoch;
  1241   if (self->mInputFrameBuffer[4] & ~kFlag_Data_FIN) {
  1242     LOG3(("SynReply %p had undefined flag set 0x%X\n", self, streamID));
  1243     self->CleanupStream(self->mInputFrameDataStream, NS_ERROR_ILLEGAL_VALUE,
  1244                         RST_PROTOCOL_ERROR);
  1245     self->ResetDownstreamState();
  1246     return NS_OK;
  1249   if (!self->mInputFrameDataLast) {
  1250     // don't process the headers yet as there could be more coming from HEADERS
  1251     // frames
  1252     self->ResetDownstreamState();
  1253     return NS_OK;
  1256   rv = self->ResponseHeadersComplete();
  1257   if (rv == NS_ERROR_ILLEGAL_VALUE) {
  1258     LOG3(("SpdySession31::HandleSynReply %p PROTOCOL_ERROR detected 0x%X\n",
  1259           self, streamID));
  1260     self->CleanupStream(self->mInputFrameDataStream, rv, RST_PROTOCOL_ERROR);
  1261     self->ResetDownstreamState();
  1262     rv = NS_OK;
  1264   return rv;
  1267 // ResponseHeadersComplete() returns NS_ERROR_ILLEGAL_VALUE when the stream
  1268 // should be reset with a PROTOCOL_ERROR, NS_OK when the SYN_REPLY was
  1269 // fine, and any other error is fatal to the session.
  1270 nsresult
  1271 SpdySession31::ResponseHeadersComplete()
  1273   LOG3(("SpdySession31::ResponseHeadersComplete %p for 0x%X fin=%d",
  1274         this, mInputFrameDataStream->StreamID(), mInputFrameDataLast));
  1276   // The spdystream needs to see flattened http headers
  1277   // Uncompressed spdy format headers currently live in
  1278   // SpdyStream31::mDecompressBuffer - convert that to HTTP format in
  1279   // mFlatHTTPResponseHeaders via ConvertHeaders()
  1281   mFlatHTTPResponseHeadersOut = 0;
  1282   nsresult rv = mInputFrameDataStream->ConvertHeaders(mFlatHTTPResponseHeaders);
  1283   if (NS_FAILED(rv))
  1284     return rv;
  1286   ChangeDownstreamState(PROCESSING_COMPLETE_HEADERS);
  1287   return NS_OK;
  1290 nsresult
  1291 SpdySession31::HandleRstStream(SpdySession31 *self)
  1293   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_RST_STREAM);
  1295   if (self->mInputFrameDataSize != 8) {
  1296     LOG3(("SpdySession31::HandleRstStream %p RST_STREAM wrong length data=%d",
  1297           self, self->mInputFrameDataSize));
  1298     return NS_ERROR_ILLEGAL_VALUE;
  1301   uint8_t flags = reinterpret_cast<uint8_t *>(self->mInputFrameBuffer.get())[4];
  1303   uint32_t streamID =
  1304     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1306   self->mDownstreamRstReason =
  1307     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[3]);
  1309   LOG3(("SpdySession31::HandleRstStream %p RST_STREAM Reason Code %u ID %x "
  1310         "flags %x", self, self->mDownstreamRstReason, streamID, flags));
  1312   if (flags != 0) {
  1313     LOG3(("SpdySession31::HandleRstStream %p RST_STREAM with flags is illegal",
  1314           self));
  1315     return NS_ERROR_ILLEGAL_VALUE;
  1318   if (self->mDownstreamRstReason == RST_INVALID_STREAM ||
  1319       self->mDownstreamRstReason == RST_STREAM_IN_USE ||
  1320       self->mDownstreamRstReason == RST_FLOW_CONTROL_ERROR) {
  1321     // basically just ignore this
  1322     LOG3(("SpdySession31::HandleRstStream %p No Reset Processing Needed.\n"));
  1323     self->ResetDownstreamState();
  1324     return NS_OK;
  1327   nsresult rv = self->SetInputFrameDataStream(streamID);
  1329   if (!self->mInputFrameDataStream) {
  1330     if (NS_FAILED(rv))
  1331       LOG(("SpdySession31::HandleRstStream %p lookup streamID for RST Frame "
  1332            "0x%X failed reason = %d :: VerifyStream Failed\n", self, streamID,
  1333            self->mDownstreamRstReason));
  1335     LOG3(("SpdySession31::HandleRstStream %p lookup streamID for RST Frame "
  1336           "0x%X failed reason = %d", self, streamID,
  1337           self->mDownstreamRstReason));
  1338     return NS_ERROR_ILLEGAL_VALUE;
  1341   self->ChangeDownstreamState(PROCESSING_CONTROL_RST_STREAM);
  1342   return NS_OK;
  1345 PLDHashOperator
  1346 SpdySession31::UpdateServerRwinEnumerator(nsAHttpTransaction *key,
  1347                                             nsAutoPtr<SpdyStream31> &stream,
  1348                                             void *closure)
  1350   int32_t delta = *(static_cast<int32_t *>(closure));
  1351   stream->UpdateRemoteWindow(delta);
  1352   return PL_DHASH_NEXT;
  1355 nsresult
  1356 SpdySession31::HandleSettings(SpdySession31 *self)
  1358   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_SETTINGS);
  1360   if (self->mInputFrameDataSize < 4) {
  1361     LOG3(("SpdySession31::HandleSettings %p SETTINGS wrong length data=%d",
  1362           self, self->mInputFrameDataSize));
  1363     return NS_ERROR_ILLEGAL_VALUE;
  1366   uint32_t numEntries =
  1367     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1369   // Ensure frame is large enough for supplied number of entries
  1370   // Each entry is 8 bytes, frame data is reduced by 4 to account for
  1371   // the NumEntries value.
  1372   if ((self->mInputFrameDataSize - 4) < (numEntries * 8)) {
  1373     LOG3(("SpdySession31::HandleSettings %p SETTINGS wrong length data=%d",
  1374           self, self->mInputFrameDataSize));
  1375     return NS_ERROR_ILLEGAL_VALUE;
  1378   LOG3(("SpdySession31::HandleSettings %p SETTINGS Control Frame with %d entries",
  1379         self, numEntries));
  1381   for (uint32_t index = 0; index < numEntries; ++index) {
  1382     unsigned char *setting = reinterpret_cast<unsigned char *>
  1383       (self->mInputFrameBuffer.get()) + 12 + index * 8;
  1385     uint32_t flags = setting[0];
  1386     uint32_t id = PR_ntohl(reinterpret_cast<uint32_t *>(setting)[0]) & 0xffffff;
  1387     uint32_t value =  PR_ntohl(reinterpret_cast<uint32_t *>(setting)[1]);
  1389     LOG3(("Settings ID %d, Flags %X, Value %d", id, flags, value));
  1391     switch (id)
  1393     case SETTINGS_TYPE_UPLOAD_BW:
  1394       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_UL_BW, value);
  1395       break;
  1397     case SETTINGS_TYPE_DOWNLOAD_BW:
  1398       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_DL_BW, value);
  1399       break;
  1401     case SETTINGS_TYPE_RTT:
  1402       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_RTT, value);
  1403       break;
  1405     case SETTINGS_TYPE_MAX_CONCURRENT:
  1406       self->mMaxConcurrent = value;
  1407       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value);
  1408       break;
  1410     case SETTINGS_TYPE_CWND:
  1411       if (flags & PERSIST_VALUE)
  1413         nsRefPtr<nsHttpConnectionInfo> ci;
  1414         self->GetConnectionInfo(getter_AddRefs(ci));
  1415         if (ci)
  1416           gHttpHandler->ConnMgr()->ReportSpdyCWNDSetting(ci, value);
  1418       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_CWND, value);
  1419       break;
  1421     case SETTINGS_TYPE_DOWNLOAD_RETRANS_RATE:
  1422       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_RETRANS, value);
  1423       break;
  1425     case SETTINGS_TYPE_INITIAL_WINDOW:
  1426       Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_IW, value >> 10);
  1428         int32_t delta = value - self->mServerInitialStreamWindow;
  1429         self->mServerInitialStreamWindow = value;
  1431         // do not use SETTINGS to adjust the session window.
  1433         // we need to add the delta to all open streams (delta can be negative)
  1434         self->mStreamTransactionHash.Enumerate(UpdateServerRwinEnumerator,
  1435                                                &delta);
  1437       break;
  1439     default:
  1440       break;
  1445   self->ResetDownstreamState();
  1446   return NS_OK;
  1449 nsresult
  1450 SpdySession31::HandleNoop(SpdySession31 *self)
  1452   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_NOOP);
  1454   // Should not be receiving noop frames in spdy/3, so we'll just
  1455   // make a log and ignore it
  1457   LOG3(("SpdySession31::HandleNoop %p NOP.", self));
  1459   self->ResetDownstreamState();
  1460   return NS_OK;
  1463 nsresult
  1464 SpdySession31::HandlePing(SpdySession31 *self)
  1466   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_PING);
  1468   if (self->mInputFrameDataSize != 4) {
  1469     LOG3(("SpdySession31::HandlePing %p PING had wrong amount of data %d",
  1470           self, self->mInputFrameDataSize));
  1471     return NS_ERROR_ILLEGAL_VALUE;
  1474   uint32_t pingID =
  1475     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1477   LOG3(("SpdySession31::HandlePing %p PING ID 0x%X.", self, pingID));
  1479   if (pingID & 0x01) {
  1480     // presumably a reply to our timeout ping
  1481     self->mPingSentEpoch = 0;
  1483   else {
  1484     // Servers initiate even numbered pings, go ahead and echo it back
  1485     self->GeneratePing(pingID);
  1488   self->ResetDownstreamState();
  1489   return NS_OK;
  1492 nsresult
  1493 SpdySession31::HandleGoAway(SpdySession31 *self)
  1495   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_GOAWAY);
  1497   if (self->mInputFrameDataSize != 8) {
  1498     LOG3(("SpdySession31::HandleGoAway %p GOAWAY had wrong amount of data %d",
  1499           self, self->mInputFrameDataSize));
  1500     return NS_ERROR_ILLEGAL_VALUE;
  1503   self->mShouldGoAway = true;
  1504   self->mGoAwayID =
  1505     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1506   self->mCleanShutdown = true;
  1508   // Find streams greater than the last-good ID and mark them for deletion
  1509   // in the mGoAwayStreamsToRestart queue with the GoAwayEnumerator. The
  1510   // underlying transaction can be restarted.
  1511   self->mStreamTransactionHash.Enumerate(GoAwayEnumerator, self);
  1513   // Process the streams marked for deletion and restart.
  1514   uint32_t size = self->mGoAwayStreamsToRestart.GetSize();
  1515   for (uint32_t count = 0; count < size; ++count) {
  1516     SpdyStream31 *stream =
  1517       static_cast<SpdyStream31 *>(self->mGoAwayStreamsToRestart.PopFront());
  1519     self->CloseStream(stream, NS_ERROR_NET_RESET);
  1520     if (stream->HasRegisteredID())
  1521       self->mStreamIDHash.Remove(stream->StreamID());
  1522     self->mStreamTransactionHash.Remove(stream->Transaction());
  1525   // Queued streams can also be deleted from this session and restarted
  1526   // in another one. (they were never sent on the network so they implicitly
  1527   // are not covered by the last-good id.
  1528   size = self->mQueuedStreams.GetSize();
  1529   for (uint32_t count = 0; count < size; ++count) {
  1530     SpdyStream31 *stream =
  1531       static_cast<SpdyStream31 *>(self->mQueuedStreams.PopFront());
  1532     self->CloseStream(stream, NS_ERROR_NET_RESET);
  1533     self->mStreamTransactionHash.Remove(stream->Transaction());
  1536   LOG3(("SpdySession31::HandleGoAway %p GOAWAY Last-Good-ID 0x%X status 0x%X "
  1537         "live streams=%d\n", self, self->mGoAwayID,
  1538         PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[3]),
  1539         self->mStreamTransactionHash.Count()));
  1541   self->ResetDownstreamState();
  1542   return NS_OK;
  1545 nsresult
  1546 SpdySession31::HandleHeaders(SpdySession31 *self)
  1548   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_HEADERS);
  1550   if (self->mInputFrameDataSize < 4) {
  1551     LOG3(("SpdySession31::HandleHeaders %p HEADERS had wrong amount of data %d",
  1552           self, self->mInputFrameDataSize));
  1553     return NS_ERROR_ILLEGAL_VALUE;
  1556   uint32_t streamID =
  1557     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1558   LOG3(("SpdySession31::HandleHeaders %p HEADERS for Stream 0x%X.\n",
  1559         self, streamID));
  1560   nsresult rv = self->SetInputFrameDataStream(streamID);
  1561   if (NS_FAILED(rv))
  1562     return rv;
  1564   if (!self->mInputFrameDataStream) {
  1565     LOG3(("SpdySession31::HandleHeaders %p lookup streamID 0x%X failed.\n",
  1566           self, streamID));
  1567     if (streamID >= self->mNextStreamID)
  1568       self->GenerateRstStream(RST_INVALID_STREAM, streamID);
  1570     rv = self->UncompressAndDiscard(12, self->mInputFrameDataSize - 4);
  1571     if (NS_FAILED(rv)) {
  1572       LOG(("SpdySession31::HandleHeaders uncompress failed\n"));
  1573       // this is fatal to the session
  1574       return rv;
  1576     self->ResetDownstreamState();
  1577     return NS_OK;
  1580   // Uncompress the headers into local buffers in the SpdyStream, leaving
  1581   // them in spdy format for the time being. Make certain to do this
  1582   // step before any error handling that might abort the stream but not
  1583   // the session becuase the session compression context will become
  1584   // inconsistent if all of the compressed data is not processed.
  1585   rv = self->mInputFrameDataStream->Uncompress(&self->mDownstreamZlib,
  1586                                                self->mInputFrameBuffer + 12,
  1587                                                self->mInputFrameDataSize - 4);
  1588   if (NS_FAILED(rv)) {
  1589     LOG(("SpdySession31::HandleHeaders uncompress failed\n"));
  1590     return rv;
  1593   self->mInputFrameDataLast = self->mInputFrameBuffer[4] & kFlag_Data_FIN;
  1594   self->mInputFrameDataStream->
  1595     UpdateTransportReadEvents(self->mInputFrameDataSize);
  1596   self->mLastDataReadEpoch = self->mLastReadEpoch;
  1598   if (self->mInputFrameBuffer[4] & ~kFlag_Data_FIN) {
  1599     LOG3(("Headers %p had undefined flag set 0x%X\n", self, streamID));
  1600     self->CleanupStream(self->mInputFrameDataStream, NS_ERROR_ILLEGAL_VALUE,
  1601                         RST_PROTOCOL_ERROR);
  1602     self->ResetDownstreamState();
  1603     return NS_OK;
  1606   if (!self->mInputFrameDataLast) {
  1607     // don't process the headers yet as there could be more HEADERS frames
  1608     self->ResetDownstreamState();
  1609     return NS_OK;
  1612   rv = self->ResponseHeadersComplete();
  1613   if (rv == NS_ERROR_ILLEGAL_VALUE) {
  1614     LOG3(("SpdySession31::HanndleHeaders %p PROTOCOL_ERROR detected 0x%X\n",
  1615           self, streamID));
  1616     self->CleanupStream(self->mInputFrameDataStream, rv, RST_PROTOCOL_ERROR);
  1617     self->ResetDownstreamState();
  1618     rv = NS_OK;
  1620   return rv;
  1623 PLDHashOperator
  1624 SpdySession31::RestartBlockedOnRwinEnumerator(nsAHttpTransaction *key,
  1625                                               nsAutoPtr<SpdyStream31> &stream,
  1626                                               void *closure)
  1628   SpdySession31 *self = static_cast<SpdySession31 *>(closure);
  1629   MOZ_ASSERT(self->mRemoteSessionWindow > 0);
  1631   if (!stream->BlockedOnRwin() || stream->RemoteWindow() <= 0)
  1632     return PL_DHASH_NEXT;
  1634   self->mReadyForWrite.Push(stream);
  1635   self->SetWriteCallbacks();
  1636   return PL_DHASH_NEXT;
  1639 nsresult
  1640 SpdySession31::HandleWindowUpdate(SpdySession31 *self)
  1642   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_WINDOW_UPDATE);
  1644   if (self->mInputFrameDataSize < 8) {
  1645     LOG3(("SpdySession31::HandleWindowUpdate %p Window Update wrong length %d\n",
  1646           self, self->mInputFrameDataSize));
  1647     return NS_ERROR_ILLEGAL_VALUE;
  1650   uint32_t delta =
  1651     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[3]);
  1652   delta &= 0x7fffffff;
  1653   uint32_t streamID =
  1654     PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
  1655   streamID &= 0x7fffffff;
  1657   LOG3(("SpdySession31::HandleWindowUpdate %p len=%d for Stream 0x%X.\n",
  1658         self, delta, streamID));
  1660   // ID of 0 is a session window update
  1661   if (streamID) {
  1662     nsresult rv = self->SetInputFrameDataStream(streamID);
  1663     if (NS_FAILED(rv))
  1664       return rv;
  1666     if (!self->mInputFrameDataStream) {
  1667       LOG3(("SpdySession31::HandleWindowUpdate %p lookup streamID 0x%X failed.\n",
  1668             self, streamID));
  1669       if (streamID >= self->mNextStreamID)
  1670         self->GenerateRstStream(RST_INVALID_STREAM, streamID);
  1671       self->ResetDownstreamState();
  1672       return NS_OK;
  1675     self->mInputFrameDataStream->UpdateRemoteWindow(delta);
  1676   } else {
  1677     int64_t oldRemoteWindow = self->mRemoteSessionWindow;
  1678     self->mRemoteSessionWindow += delta;
  1679     if ((oldRemoteWindow <= 0) && (self->mRemoteSessionWindow > 0)) {
  1680       LOG3(("SpdySession31::HandleWindowUpdate %p restart session window\n",
  1681             self));
  1682       self->mStreamTransactionHash.Enumerate(RestartBlockedOnRwinEnumerator, self);
  1686   self->ResetDownstreamState();
  1687   return NS_OK;
  1690 nsresult
  1691 SpdySession31::HandleCredential(SpdySession31 *self)
  1693   MOZ_ASSERT(self->mFrameControlType == CONTROL_TYPE_CREDENTIAL);
  1695   // These aren't used yet. Just ignore the frame.
  1697   LOG3(("SpdySession31::HandleCredential %p NOP.", self));
  1699   self->ResetDownstreamState();
  1700   return NS_OK;
  1703 //-----------------------------------------------------------------------------
  1704 // nsAHttpTransaction. It is expected that nsHttpConnection is the caller
  1705 // of these methods
  1706 //-----------------------------------------------------------------------------
  1708 void
  1709 SpdySession31::OnTransportStatus(nsITransport* aTransport,
  1710                                  nsresult aStatus,
  1711                                  uint64_t aProgress)
  1713   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  1715   switch (aStatus) {
  1716     // These should appear only once, deliver to the first
  1717     // transaction on the session.
  1718   case NS_NET_STATUS_RESOLVING_HOST:
  1719   case NS_NET_STATUS_RESOLVED_HOST:
  1720   case NS_NET_STATUS_CONNECTING_TO:
  1721   case NS_NET_STATUS_CONNECTED_TO:
  1723     SpdyStream31 *target = mStreamIDHash.Get(1);
  1724     nsAHttpTransaction *transaction = target ? target->Transaction() : nullptr;
  1725     if (transaction)
  1726       transaction->OnTransportStatus(aTransport, aStatus, aProgress);
  1727     break;
  1730   default:
  1731     // The other transport events are ignored here because there is no good
  1732     // way to map them to the right transaction in spdy. Instead, the events
  1733     // are generated again from the spdy code and passed directly to the
  1734     // correct transaction.
  1736     // NS_NET_STATUS_SENDING_TO:
  1737     // This is generated by the socket transport when (part) of
  1738     // a transaction is written out
  1739     //
  1740     // There is no good way to map it to the right transaction in spdy,
  1741     // so it is ignored here and generated separately when the SYN_STREAM
  1742     // is sent from SpdyStream31::TransmitFrame
  1744     // NS_NET_STATUS_WAITING_FOR:
  1745     // Created by nsHttpConnection when the request has been totally sent.
  1746     // There is no good way to map it to the right transaction in spdy,
  1747     // so it is ignored here and generated separately when the same
  1748     // condition is complete in SpdyStream31 when there is no more
  1749     // request body left to be transmitted.
  1751     // NS_NET_STATUS_RECEIVING_FROM
  1752     // Generated in spdysession whenever we read a data frame or a syn_reply
  1753     // that can be attributed to a particular stream/transaction
  1755     break;
  1759 // ReadSegments() is used to write data to the network. Generally, HTTP
  1760 // request data is pulled from the approriate transaction and
  1761 // converted to SPDY data. Sometimes control data like window-update are
  1762 // generated instead.
  1764 nsresult
  1765 SpdySession31::ReadSegments(nsAHttpSegmentReader *reader,
  1766                             uint32_t count,
  1767                             uint32_t *countRead)
  1769   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  1771   MOZ_ASSERT(!mSegmentReader || !reader || (mSegmentReader == reader),
  1772              "Inconsistent Write Function Callback");
  1774   if (reader)
  1775     mSegmentReader = reader;
  1777   nsresult rv;
  1778   *countRead = 0;
  1780   LOG3(("SpdySession31::ReadSegments %p", this));
  1782   SpdyStream31 *stream = static_cast<SpdyStream31 *>(mReadyForWrite.PopFront());
  1783   if (!stream) {
  1784     LOG3(("SpdySession31 %p could not identify a stream to write; suspending.",
  1785           this));
  1786     FlushOutputQueue();
  1787     SetWriteCallbacks();
  1788     return NS_BASE_STREAM_WOULD_BLOCK;
  1791   LOG3(("SpdySession31 %p will write from SpdyStream31 %p 0x%X "
  1792         "block-input=%d block-output=%d\n", this, stream, stream->StreamID(),
  1793         stream->RequestBlockedOnRead(), stream->BlockedOnRwin()));
  1795   rv = stream->ReadSegments(this, count, countRead);
  1797   // Not every permutation of stream->ReadSegents produces data (and therefore
  1798   // tries to flush the output queue) - SENDING_FIN_STREAM can be an example
  1799   // of that. But we might still have old data buffered that would be good
  1800   // to flush.
  1801   FlushOutputQueue();
  1803   // Allow new server reads - that might be data or control information
  1804   // (e.g. window updates or http replies) that are responses to these writes
  1805   ResumeRecv();
  1807   if (stream->RequestBlockedOnRead()) {
  1809     // We are blocked waiting for input - either more http headers or
  1810     // any request body data. When more data from the request stream
  1811     // becomes available the httptransaction will call conn->ResumeSend().
  1813     LOG3(("SpdySession31::ReadSegments %p dealing with block on read", this));
  1815     // call readsegments again if there are other streams ready
  1816     // to run in this session
  1817     if (GetWriteQueueSize())
  1818       rv = NS_OK;
  1819     else
  1820       rv = NS_BASE_STREAM_WOULD_BLOCK;
  1821     SetWriteCallbacks();
  1822     return rv;
  1825   if (NS_FAILED(rv)) {
  1826     LOG3(("SpdySession31::ReadSegments %p returning FAIL code %X",
  1827           this, rv));
  1828     if (rv != NS_BASE_STREAM_WOULD_BLOCK)
  1829       CleanupStream(stream, rv, RST_CANCEL);
  1830     return rv;
  1833   if (*countRead > 0) {
  1834     LOG3(("SpdySession31::ReadSegments %p stream=%p countread=%d",
  1835           this, stream, *countRead));
  1836     mReadyForWrite.Push(stream);
  1837     SetWriteCallbacks();
  1838     return rv;
  1841   if (stream->BlockedOnRwin()) {
  1842     LOG3(("SpdySession31 %p will stream %p 0x%X suspended for flow control\n",
  1843           this, stream, stream->StreamID()));
  1844     return NS_BASE_STREAM_WOULD_BLOCK;
  1847   LOG3(("SpdySession31::ReadSegments %p stream=%p stream send complete",
  1848         this, stream));
  1850   // call readsegments again if there are other streams ready
  1851   // to go in this session
  1852   SetWriteCallbacks();
  1854   return rv;
  1857 // WriteSegments() is used to read data off the socket. Generally this is
  1858 // just the SPDY frame header and from there the appropriate SPDYStream
  1859 // is identified from the Stream-ID. The http transaction associated with
  1860 // that read then pulls in the data directly, which it will feed to
  1861 // OnWriteSegment(). That function will gateway it into http and feed
  1862 // it to the appropriate transaction.
  1864 // we call writer->OnWriteSegment via NetworkRead() to get a spdy header..
  1865 // and decide if it is data or control.. if it is control, just deal with it.
  1866 // if it is data, identify the spdy stream
  1867 // call stream->WriteSegments which can call this::OnWriteSegment to get the
  1868 // data. It always gets full frames if they are part of the stream
  1870 nsresult
  1871 SpdySession31::WriteSegments(nsAHttpSegmentWriter *writer,
  1872                              uint32_t count,
  1873                              uint32_t *countWritten)
  1875   typedef nsresult  (*Control_FX) (SpdySession31 *self);
  1876   static const Control_FX sControlFunctions[] =
  1878       nullptr,
  1879       SpdySession31::HandleSynStream,
  1880       SpdySession31::HandleSynReply,
  1881       SpdySession31::HandleRstStream,
  1882       SpdySession31::HandleSettings,
  1883       SpdySession31::HandleNoop,
  1884       SpdySession31::HandlePing,
  1885       SpdySession31::HandleGoAway,
  1886       SpdySession31::HandleHeaders,
  1887       SpdySession31::HandleWindowUpdate,
  1888       SpdySession31::HandleCredential
  1889   };
  1891   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  1893   nsresult rv;
  1894   *countWritten = 0;
  1896   if (mClosed)
  1897     return NS_ERROR_FAILURE;
  1899   SetWriteCallbacks();
  1901   // If there are http transactions attached to a push stream with filled buffers
  1902   // trigger that data pump here. This only reads from buffers (not the network)
  1903   // so mDownstreamState doesn't matter.
  1904   SpdyStream31 *pushConnectedStream =
  1905     static_cast<SpdyStream31 *>(mReadyForRead.PopFront());
  1906   if (pushConnectedStream) {
  1907     LOG3(("SpdySession31::WriteSegments %p processing pushed stream 0x%X\n",
  1908           this, pushConnectedStream->StreamID()));
  1909     mSegmentWriter = writer;
  1910     rv = pushConnectedStream->WriteSegments(this, count, countWritten);
  1911     mSegmentWriter = nullptr;
  1913     // The pipe in nsHttpTransaction rewrites CLOSED error codes into OK
  1914     // so we need this check to determine the truth.
  1915     if (NS_SUCCEEDED(rv) && !*countWritten &&
  1916         pushConnectedStream->PushSource() &&
  1917         pushConnectedStream->PushSource()->GetPushComplete()) {
  1918       rv = NS_BASE_STREAM_CLOSED;
  1921     if (rv == NS_BASE_STREAM_CLOSED) {
  1922       CleanupStream(pushConnectedStream, NS_OK, RST_CANCEL);
  1923       rv = NS_OK;
  1926     // if we return OK to nsHttpConnection it will use mSocketInCondition
  1927     // to determine whether to schedule more reads, incorrectly
  1928     // assuming that nsHttpConnection::OnSocketWrite() was called.
  1929     if (NS_SUCCEEDED(rv) || rv == NS_BASE_STREAM_WOULD_BLOCK) {
  1930       rv = NS_BASE_STREAM_WOULD_BLOCK;
  1931       ResumeRecv();
  1934     return rv;
  1937   // We buffer all control frames and act on them in this layer.
  1938   // We buffer the first 8 bytes of data frames (the header) but
  1939   // the actual data is passed through unprocessed.
  1941   if (mDownstreamState == BUFFERING_FRAME_HEADER) {
  1942     // The first 8 bytes of every frame is header information that
  1943     // we are going to want to strip before passing to http. That is
  1944     // true of both control and data packets.
  1946     MOZ_ASSERT(mInputFrameBufferUsed < 8,
  1947                "Frame Buffer Used Too Large for State");
  1949     rv = NetworkRead(writer, mInputFrameBuffer + mInputFrameBufferUsed,
  1950                      8 - mInputFrameBufferUsed, countWritten);
  1952     if (NS_FAILED(rv)) {
  1953       LOG3(("SpdySession31 %p buffering frame header read failure %x\n",
  1954             this, rv));
  1955       // maybe just blocked reading from network
  1956       if (rv == NS_BASE_STREAM_WOULD_BLOCK)
  1957         rv = NS_OK;
  1958       return rv;
  1961     LogIO(this, nullptr, "Reading Frame Header",
  1962           mInputFrameBuffer + mInputFrameBufferUsed, *countWritten);
  1964     mInputFrameBufferUsed += *countWritten;
  1966     if (mInputFrameBufferUsed < 8)
  1968       LOG3(("SpdySession31::WriteSegments %p "
  1969             "BUFFERING FRAME HEADER incomplete size=%d",
  1970             this, mInputFrameBufferUsed));
  1971       return rv;
  1974     // For both control and data frames the second 32 bit word of the header
  1975     // is 8-flags, 24-length. (network byte order)
  1976     mInputFrameDataSize =
  1977       PR_ntohl(reinterpret_cast<uint32_t *>(mInputFrameBuffer.get())[1]);
  1978     mInputFrameDataSize &= 0x00ffffff;
  1979     mInputFrameDataRead = 0;
  1981     if (mInputFrameBuffer[0] & kFlag_Control) {
  1982       EnsureBuffer(mInputFrameBuffer, mInputFrameDataSize + 8, 8,
  1983                    mInputFrameBufferSize);
  1984       ChangeDownstreamState(BUFFERING_CONTROL_FRAME);
  1986       // The first 32 bit word of the header is
  1987       // 1 ctrl - 15 version - 16 type
  1988       uint16_t version =
  1989         PR_ntohs(reinterpret_cast<uint16_t *>(mInputFrameBuffer.get())[0]);
  1990       version &= 0x7fff;
  1992       mFrameControlType =
  1993         PR_ntohs(reinterpret_cast<uint16_t *>(mInputFrameBuffer.get())[1]);
  1995       LOG3(("SpdySession31::WriteSegments %p - Control Frame Identified "
  1996             "type %d version %d data len %d",
  1997             this, mFrameControlType, version, mInputFrameDataSize));
  1999       if (mFrameControlType >= CONTROL_TYPE_LAST ||
  2000           mFrameControlType <= CONTROL_TYPE_FIRST)
  2001         return NS_ERROR_ILLEGAL_VALUE;
  2003       if (version != kVersion)
  2004         return NS_ERROR_ILLEGAL_VALUE;
  2006     else {
  2007       ChangeDownstreamState(PROCESSING_DATA_FRAME);
  2009       Telemetry::Accumulate(Telemetry::SPDY_CHUNK_RECVD,
  2010                             mInputFrameDataSize >> 10);
  2011       mLastDataReadEpoch = mLastReadEpoch;
  2013       uint32_t streamID =
  2014         PR_ntohl(reinterpret_cast<uint32_t *>(mInputFrameBuffer.get())[0]);
  2015       rv = SetInputFrameDataStream(streamID);
  2016       if (NS_FAILED(rv)) {
  2017         LOG(("SpdySession31::WriteSegments %p lookup streamID 0x%X failed. "
  2018              "probably due to verification.\n", this, streamID));
  2019         return rv;
  2021       if (!mInputFrameDataStream) {
  2022         LOG3(("SpdySession31::WriteSegments %p lookup streamID 0x%X failed. "
  2023               "Next = 0x%X", this, streamID, mNextStreamID));
  2024         if (streamID >= mNextStreamID)
  2025           GenerateRstStream(RST_INVALID_STREAM, streamID);
  2026         ChangeDownstreamState(DISCARDING_DATA_FRAME);
  2028       else if (mInputFrameDataStream->RecvdFin()) {
  2029         LOG3(("SpdySession31::WriteSegments %p streamID 0x%X "
  2030               "Data arrived for already server closed stream.\n",
  2031               this, streamID));
  2032         GenerateRstStream(RST_STREAM_ALREADY_CLOSED, streamID);
  2033         ChangeDownstreamState(DISCARDING_DATA_FRAME);
  2035       else if (!mInputFrameDataStream->RecvdData()) {
  2036         LOG3(("SpdySession31 %p First Data Frame Flushes Headers stream 0x%X\n",
  2037               this, streamID));
  2039         mInputFrameDataStream->SetRecvdData(true);
  2040         rv = ResponseHeadersComplete();
  2041         if (rv == NS_ERROR_ILLEGAL_VALUE) {
  2042           LOG3(("SpdySession31 %p PROTOCOL_ERROR detected 0x%X\n",
  2043                 this, streamID));
  2044           CleanupStream(mInputFrameDataStream, rv, RST_PROTOCOL_ERROR);
  2045           ChangeDownstreamState(DISCARDING_DATA_FRAME);
  2047         else {
  2048           mDataPending = true;
  2052       mInputFrameDataLast = (mInputFrameBuffer[4] & kFlag_Data_FIN);
  2053       LOG3(("Start Processing Data Frame. "
  2054             "Session=%p Stream ID 0x%X Stream Ptr %p Fin=%d Len=%d",
  2055             this, streamID, mInputFrameDataStream, mInputFrameDataLast,
  2056             mInputFrameDataSize));
  2057       UpdateLocalRwin(mInputFrameDataStream, mInputFrameDataSize);
  2061   if (mDownstreamState == PROCESSING_CONTROL_RST_STREAM) {
  2062     if (mDownstreamRstReason == RST_REFUSED_STREAM)
  2063       rv = NS_ERROR_NET_RESET;            //we can retry this 100% safely
  2064     else if (mDownstreamRstReason == RST_CANCEL ||
  2065              mDownstreamRstReason == RST_PROTOCOL_ERROR ||
  2066              mDownstreamRstReason == RST_INTERNAL_ERROR ||
  2067              mDownstreamRstReason == RST_UNSUPPORTED_VERSION)
  2068       rv = NS_ERROR_NET_INTERRUPT;
  2069     else if (mDownstreamRstReason == RST_FRAME_TOO_LARGE)
  2070       rv = NS_ERROR_FILE_TOO_BIG;
  2071     else
  2072       rv = NS_ERROR_ILLEGAL_VALUE;
  2074     if (mDownstreamRstReason != RST_REFUSED_STREAM &&
  2075         mDownstreamRstReason != RST_CANCEL)
  2076       mShouldGoAway = true;
  2078     // mInputFrameDataStream is reset by ChangeDownstreamState
  2079     SpdyStream31 *stream = mInputFrameDataStream;
  2080     ResetDownstreamState();
  2081     LOG3(("SpdySession31::WriteSegments cleanup stream on recv of rst "
  2082           "session=%p stream=%p 0x%X\n", this, stream,
  2083           stream ? stream->StreamID() : 0));
  2084     CleanupStream(stream, rv, RST_CANCEL);
  2085     return NS_OK;
  2088   if (mDownstreamState == PROCESSING_DATA_FRAME ||
  2089       mDownstreamState == PROCESSING_COMPLETE_HEADERS) {
  2091     // The cleanup stream should only be set while stream->WriteSegments is
  2092     // on the stack and then cleaned up in this code block afterwards.
  2093     MOZ_ASSERT(!mNeedsCleanup, "cleanup stream set unexpectedly");
  2094     mNeedsCleanup = nullptr;                     /* just in case */
  2096     mSegmentWriter = writer;
  2097     rv = mInputFrameDataStream->WriteSegments(this, count, countWritten);
  2098     mSegmentWriter = nullptr;
  2100     mLastDataReadEpoch = mLastReadEpoch;
  2102     if (SoftStreamError(rv)) {
  2103       // This will happen when the transaction figures out it is EOF, generally
  2104       // due to a content-length match being made. Return OK from this function
  2105       // otherwise the whole session would be torn down.
  2106       SpdyStream31 *stream = mInputFrameDataStream;
  2108       // if we were doing PROCESSING_COMPLETE_HEADERS need to pop the state
  2109       // back to PROCESSING_DATA_FRAME where we came from
  2110       mDownstreamState = PROCESSING_DATA_FRAME;
  2112       if (mInputFrameDataRead == mInputFrameDataSize)
  2113         ResetDownstreamState();
  2114       LOG3(("SpdySession31::WriteSegments session=%p stream=%p 0x%X "
  2115             "needscleanup=%p. cleanup stream based on "
  2116             "stream->writeSegments returning code %X\n",
  2117             this, stream, stream ? stream->StreamID() : 0,
  2118             mNeedsCleanup, rv));
  2119       CleanupStream(stream, NS_OK, RST_CANCEL);
  2120       MOZ_ASSERT(!mNeedsCleanup, "double cleanup out of data frame");
  2121       mNeedsCleanup = nullptr;                     /* just in case */
  2122       return NS_OK;
  2125     if (mNeedsCleanup) {
  2126       LOG3(("SpdySession31::WriteSegments session=%p stream=%p 0x%X "
  2127             "cleanup stream based on mNeedsCleanup.\n",
  2128             this, mNeedsCleanup, mNeedsCleanup ? mNeedsCleanup->StreamID() : 0));
  2129       CleanupStream(mNeedsCleanup, NS_OK, RST_CANCEL);
  2130       mNeedsCleanup = nullptr;
  2133     if (NS_FAILED(rv)) {
  2134       LOG3(("SpdySession31 %p data frame read failure %x\n", this, rv));
  2135       // maybe just blocked reading from network
  2136       if (rv == NS_BASE_STREAM_WOULD_BLOCK)
  2137         rv = NS_OK;
  2140     return rv;
  2143   if (mDownstreamState == DISCARDING_DATA_FRAME) {
  2144     char trash[4096];
  2145     uint32_t count = std::min(4096U, mInputFrameDataSize - mInputFrameDataRead);
  2147     if (!count) {
  2148       ResetDownstreamState();
  2149       ResumeRecv();
  2150       return NS_BASE_STREAM_WOULD_BLOCK;
  2153     rv = NetworkRead(writer, trash, count, countWritten);
  2155     if (NS_FAILED(rv)) {
  2156       LOG3(("SpdySession31 %p discard frame read failure %x\n", this, rv));
  2157       // maybe just blocked reading from network
  2158       if (rv == NS_BASE_STREAM_WOULD_BLOCK)
  2159         rv = NS_OK;
  2160       return rv;
  2163     LogIO(this, nullptr, "Discarding Frame", trash, *countWritten);
  2165     mInputFrameDataRead += *countWritten;
  2167     if (mInputFrameDataRead == mInputFrameDataSize)
  2168       ResetDownstreamState();
  2169     return rv;
  2172   MOZ_ASSERT(mDownstreamState == BUFFERING_CONTROL_FRAME);
  2173   if (mDownstreamState != BUFFERING_CONTROL_FRAME) {
  2174     // this cannot happen
  2175     return NS_ERROR_UNEXPECTED;
  2178   MOZ_ASSERT(mInputFrameBufferUsed == 8,
  2179              "Frame Buffer Header Not Present");
  2181   rv = NetworkRead(writer, mInputFrameBuffer + 8 + mInputFrameDataRead,
  2182                    mInputFrameDataSize - mInputFrameDataRead, countWritten);
  2184   if (NS_FAILED(rv)) {
  2185     LOG3(("SpdySession31 %p buffering control frame read failure %x\n",
  2186           this, rv));
  2187     // maybe just blocked reading from network
  2188     if (rv == NS_BASE_STREAM_WOULD_BLOCK)
  2189       rv = NS_OK;
  2190     return rv;
  2193   LogIO(this, nullptr, "Reading Control Frame",
  2194         mInputFrameBuffer + 8 + mInputFrameDataRead, *countWritten);
  2196   mInputFrameDataRead += *countWritten;
  2198   if (mInputFrameDataRead != mInputFrameDataSize)
  2199     return NS_OK;
  2201   // This check is actually redundant, the control type was previously
  2202   // checked to make sure it was in range, but we will check it again
  2203   // at time of use to make sure a regression doesn't creep in.
  2204   if (mFrameControlType >= CONTROL_TYPE_LAST ||
  2205       mFrameControlType <= CONTROL_TYPE_FIRST)
  2207     MOZ_ASSERT(false, "control type out of range");
  2208     return NS_ERROR_ILLEGAL_VALUE;
  2210   rv = sControlFunctions[mFrameControlType](this);
  2212   MOZ_ASSERT(NS_FAILED(rv) ||
  2213              mDownstreamState != BUFFERING_CONTROL_FRAME,
  2214              "Control Handler returned OK but did not change state");
  2216   if (mShouldGoAway && !mStreamTransactionHash.Count())
  2217     Close(NS_OK);
  2218   return rv;
  2221 void
  2222 SpdySession31::UpdateLocalStreamWindow(SpdyStream31 *stream,
  2223                                        uint32_t bytes)
  2225   if (!stream) // this is ok - it means there was a data frame for a rst stream
  2226     return;
  2228   stream->DecrementLocalWindow(bytes);
  2230   // If this data packet was not for a valid or live stream then there
  2231   // is no reason to mess with the flow control
  2232   if (stream->RecvdFin())
  2233     return;
  2235   // Don't necessarily ack every data packet. Only do it
  2236   // after a significant amount of data.
  2237   uint64_t unacked = stream->LocalUnAcked();
  2238   int64_t  localWindow = stream->LocalWindow();
  2240   LOG3(("SpdySession31::UpdateLocalStreamWindow this=%p id=0x%X newbytes=%u "
  2241         "unacked=%llu localWindow=%lld\n",
  2242         this, stream->StreamID(), bytes, unacked, localWindow));
  2244   if (!unacked)
  2245     return;
  2247   if ((unacked < kMinimumToAck) && (localWindow > kEmergencyWindowThreshold))
  2248     return;
  2250   if (!stream->HasSink()) {
  2251     LOG3(("SpdySession31::UpdateLocalStreamWindow %p 0x%X Pushed Stream Has No Sink\n",
  2252           this, stream->StreamID()));
  2253     return;
  2256   // Generate window updates directly out of spdysession instead of the stream
  2257   // in order to avoid queue delays in getting the 'ACK' out.
  2258   uint32_t toack = (unacked <= 0x7fffffffU) ? unacked : 0x7fffffffU;
  2260   LOG3(("SpdySession31::UpdateLocalStreamWindow Ack this=%p id=0x%X acksize=%d\n",
  2261         this, stream->StreamID(), toack));
  2262   stream->IncrementLocalWindow(toack);
  2264   // room for this packet needs to be ensured before calling this function
  2265   static const uint32_t dataLen = 8;
  2266   char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
  2267   mOutputQueueUsed += 8 + dataLen;
  2268   MOZ_ASSERT(mOutputQueueUsed <= mOutputQueueSize);
  2270   memset(packet, 0, 8 + dataLen);
  2271   packet[0] = kFlag_Control;
  2272   packet[1] = kVersion;
  2273   packet[3] = CONTROL_TYPE_WINDOW_UPDATE;
  2274   packet[7] = dataLen;
  2276   uint32_t id = PR_htonl(stream->StreamID());
  2277   memcpy(packet + 8, &id, 4);
  2278   toack = PR_htonl(toack);
  2279   memcpy(packet + 12, &toack, 4);
  2281   LogIO(this, stream, "Stream Window Update", packet, 8 + dataLen);
  2282   // dont flush here, this write can commonly be coalesced with a
  2283   // session window update to immediately follow.
  2286 void
  2287 SpdySession31::UpdateLocalSessionWindow(uint32_t bytes)
  2289   if (!bytes)
  2290     return;
  2292   mLocalSessionWindow -= bytes;
  2294   LOG3(("SpdySession31::UpdateLocalSessionWindow this=%p newbytes=%u "
  2295         "localWindow=%lld\n", this, bytes, mLocalSessionWindow));
  2297   // Don't necessarily ack every data packet. Only do it
  2298   // after a significant amount of data.
  2299   if ((mLocalSessionWindow > (ASpdySession::kInitialRwin - kMinimumToAck)) &&
  2300       (mLocalSessionWindow > kEmergencyWindowThreshold))
  2301     return;
  2303   // Only send max 31 bits of window updates at a time.
  2304   uint64_t toack64 = ASpdySession::kInitialRwin - mLocalSessionWindow;
  2305   uint32_t toack = (toack64 <= 0x7fffffffU) ? toack64 : 0x7fffffffU;
  2307   LOG3(("SpdySession31::UpdateLocalSessionWindow Ack this=%p acksize=%u\n",
  2308         this, toack));
  2309   mLocalSessionWindow += toack;
  2311   // room for this packet needs to be ensured before calling this function
  2312   static const uint32_t dataLen = 8;
  2313   char *packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
  2314   mOutputQueueUsed += 8 + dataLen;
  2315   MOZ_ASSERT(mOutputQueueUsed <= mOutputQueueSize);
  2317   memset(packet, 0, 8 + dataLen);
  2318   packet[0] = kFlag_Control;
  2319   packet[1] = kVersion;
  2320   packet[3] = CONTROL_TYPE_WINDOW_UPDATE;
  2321   packet[7] = dataLen;
  2323   // packet 8-11 is ID and left at 0 for session ID
  2324   toack = PR_htonl(toack);
  2325   memcpy(packet + 12, &toack, 4);
  2327   LogIO(this, nullptr, "Session Window Update", packet, 8 + dataLen);
  2328   // dont flush here, this write can commonly be coalesced with others
  2331 void
  2332 SpdySession31::UpdateLocalRwin(SpdyStream31 *stream,
  2333                                uint32_t bytes)
  2335   // make sure there is room for 2 window updates even though
  2336   // we may not generate any.
  2337   EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + (16 *2),
  2338                mOutputQueueUsed, mOutputQueueSize);
  2340   UpdateLocalStreamWindow(stream, bytes);
  2341   UpdateLocalSessionWindow(bytes);
  2342   FlushOutputQueue();
  2345 void
  2346 SpdySession31::Close(nsresult aReason)
  2348   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2350   if (mClosed)
  2351     return;
  2353   LOG3(("SpdySession31::Close %p %X", this, aReason));
  2355   mClosed = true;
  2357   mStreamTransactionHash.Enumerate(ShutdownEnumerator, this);
  2358   mStreamIDHash.Clear();
  2359   mStreamTransactionHash.Clear();
  2361   uint32_t goAwayReason;
  2362   if (NS_SUCCEEDED(aReason)) {
  2363     goAwayReason = OK;
  2364   } else if (aReason == NS_ERROR_ILLEGAL_VALUE) {
  2365     goAwayReason = PROTOCOL_ERROR;
  2366   } else {
  2367     goAwayReason = INTERNAL_ERROR;
  2369   GenerateGoAway(goAwayReason);
  2370   mConnection = nullptr;
  2371   mSegmentReader = nullptr;
  2372   mSegmentWriter = nullptr;
  2375 void
  2376 SpdySession31::CloseTransaction(nsAHttpTransaction *aTransaction,
  2377                                 nsresult aResult)
  2379   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2380   LOG3(("SpdySession31::CloseTransaction %p %p %x", this, aTransaction, aResult));
  2382   // Generally this arrives as a cancel event from the connection manager.
  2384   // need to find the stream and call CleanupStream() on it.
  2385   SpdyStream31 *stream = mStreamTransactionHash.Get(aTransaction);
  2386   if (!stream) {
  2387     LOG3(("SpdySession31::CloseTransaction %p %p %x - not found.",
  2388           this, aTransaction, aResult));
  2389     return;
  2391   LOG3(("SpdySession31::CloseTranscation probably a cancel. "
  2392         "this=%p, trans=%p, result=%x, streamID=0x%X stream=%p",
  2393         this, aTransaction, aResult, stream->StreamID(), stream));
  2394   CleanupStream(stream, aResult, RST_CANCEL);
  2395   ResumeRecv();
  2398 //-----------------------------------------------------------------------------
  2399 // nsAHttpSegmentReader
  2400 //-----------------------------------------------------------------------------
  2402 nsresult
  2403 SpdySession31::OnReadSegment(const char *buf,
  2404                              uint32_t count,
  2405                              uint32_t *countRead)
  2407   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2409   nsresult rv;
  2411   // If we can release old queued data then we can try and write the new
  2412   // data directly to the network without using the output queue at all
  2413   if (mOutputQueueUsed)
  2414     FlushOutputQueue();
  2416   if (!mOutputQueueUsed && mSegmentReader) {
  2417     // try and write directly without output queue
  2418     rv = mSegmentReader->OnReadSegment(buf, count, countRead);
  2420     if (rv == NS_BASE_STREAM_WOULD_BLOCK)
  2421       *countRead = 0;
  2422     else if (NS_FAILED(rv))
  2423       return rv;
  2425     if (*countRead < count) {
  2426       uint32_t required = count - *countRead;
  2427       // assuming a commitment() happened, this ensurebuffer is a nop
  2428       // but just in case the queuesize is too small for the required data
  2429       // call ensurebuffer().
  2430       EnsureBuffer(mOutputQueueBuffer, required, 0, mOutputQueueSize);
  2431       memcpy(mOutputQueueBuffer.get(), buf + *countRead, required);
  2432       mOutputQueueUsed = required;
  2435     *countRead = count;
  2436     return NS_OK;
  2439   // At this point we are going to buffer the new data in the output
  2440   // queue if it fits. By coalescing multiple small submissions into one larger
  2441   // buffer we can get larger writes out to the network later on.
  2443   // This routine should not be allowed to fill up the output queue
  2444   // all on its own - at least kQueueReserved bytes are always left
  2445   // for other routines to use - but this is an all-or-nothing function,
  2446   // so if it will not all fit just return WOULD_BLOCK
  2448   if ((mOutputQueueUsed + count) > (mOutputQueueSize - kQueueReserved))
  2449     return NS_BASE_STREAM_WOULD_BLOCK;
  2451   memcpy(mOutputQueueBuffer.get() + mOutputQueueUsed, buf, count);
  2452   mOutputQueueUsed += count;
  2453   *countRead = count;
  2455   FlushOutputQueue();
  2457   return NS_OK;
  2460 nsresult
  2461 SpdySession31::CommitToSegmentSize(uint32_t count, bool forceCommitment)
  2463   if (mOutputQueueUsed)
  2464     FlushOutputQueue();
  2466   // would there be enough room to buffer this if needed?
  2467   if ((mOutputQueueUsed + count) <= (mOutputQueueSize - kQueueReserved))
  2468     return NS_OK;
  2470   // if we are using part of our buffers already, try again later unless
  2471   // forceCommitment is set.
  2472   if (mOutputQueueUsed && !forceCommitment)
  2473     return NS_BASE_STREAM_WOULD_BLOCK;
  2475   if (mOutputQueueUsed) {
  2476     // normally we avoid the memmove of RealignOutputQueue, but we'll try
  2477     // it if forceCommitment is set before growing the buffer.
  2478     RealignOutputQueue();
  2480     // is there enough room now?
  2481     if ((mOutputQueueUsed + count) <= (mOutputQueueSize - kQueueReserved))
  2482       return NS_OK;
  2485   // resize the buffers as needed
  2486   EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + count + kQueueReserved,
  2487                mOutputQueueUsed, mOutputQueueSize);
  2489   MOZ_ASSERT((mOutputQueueUsed + count) <= (mOutputQueueSize - kQueueReserved),
  2490              "buffer not as large as expected");
  2492   return NS_OK;
  2495 //-----------------------------------------------------------------------------
  2496 // nsAHttpSegmentWriter
  2497 //-----------------------------------------------------------------------------
  2499 nsresult
  2500 SpdySession31::OnWriteSegment(char *buf,
  2501                               uint32_t count,
  2502                               uint32_t *countWritten)
  2504   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2505   nsresult rv;
  2507   if (!mSegmentWriter) {
  2508     // the only way this could happen would be if Close() were called on the
  2509     // stack with WriteSegments()
  2510     return NS_ERROR_FAILURE;
  2513   if (mDownstreamState == PROCESSING_DATA_FRAME) {
  2515     if (mInputFrameDataLast &&
  2516         mInputFrameDataRead == mInputFrameDataSize) {
  2517       *countWritten = 0;
  2518       SetNeedsCleanup();
  2519       return NS_BASE_STREAM_CLOSED;
  2522     count = std::min(count, mInputFrameDataSize - mInputFrameDataRead);
  2523     rv = NetworkRead(mSegmentWriter, buf, count, countWritten);
  2524     if (NS_FAILED(rv))
  2525       return rv;
  2527     LogIO(this, mInputFrameDataStream, "Reading Data Frame",
  2528           buf, *countWritten);
  2530     mInputFrameDataRead += *countWritten;
  2532     mInputFrameDataStream->UpdateTransportReadEvents(*countWritten);
  2533     if ((mInputFrameDataRead == mInputFrameDataSize) && !mInputFrameDataLast)
  2534       ResetDownstreamState();
  2536     return rv;
  2539   if (mDownstreamState == PROCESSING_COMPLETE_HEADERS) {
  2541     if (mFlatHTTPResponseHeaders.Length() == mFlatHTTPResponseHeadersOut &&
  2542         mInputFrameDataLast) {
  2543       *countWritten = 0;
  2544       SetNeedsCleanup();
  2545       return NS_BASE_STREAM_CLOSED;
  2548     count = std::min(count,
  2549                      mFlatHTTPResponseHeaders.Length() -
  2550                      mFlatHTTPResponseHeadersOut);
  2551     memcpy(buf,
  2552            mFlatHTTPResponseHeaders.get() + mFlatHTTPResponseHeadersOut,
  2553            count);
  2554     mFlatHTTPResponseHeadersOut += count;
  2555     *countWritten = count;
  2557     if (mFlatHTTPResponseHeaders.Length() == mFlatHTTPResponseHeadersOut) {
  2558       if (mDataPending) {
  2559         // Now ready to process data frames - pop PROCESING_DATA_FRAME back onto
  2560         // the stack because receipt of that first data frame triggered the
  2561         // response header processing
  2562         mDataPending = false;
  2563         ChangeDownstreamState(PROCESSING_DATA_FRAME);
  2565       else if (!mInputFrameDataLast) {
  2566         // If more frames are expected in this stream, then reset the state so they can be
  2567         // handled. Otherwise (e.g. a 0 length response with the fin on the SYN_REPLY)
  2568         // stay in PROCESSING_COMPLETE_HEADERS state so the SetNeedsCleanup() code above can
  2569         // cleanup the stream.
  2570         ResetDownstreamState();
  2574     return NS_OK;
  2577   return NS_ERROR_UNEXPECTED;
  2580 void
  2581 SpdySession31::SetNeedsCleanup()
  2583   LOG3(("SpdySession31::SetNeedsCleanup %p - recorded downstream fin of "
  2584         "stream %p 0x%X", this, mInputFrameDataStream,
  2585         mInputFrameDataStream->StreamID()));
  2587   // This will result in Close() being called
  2588   MOZ_ASSERT(!mNeedsCleanup, "mNeedsCleanup unexpectedly set");
  2589   mNeedsCleanup = mInputFrameDataStream;
  2590   ResetDownstreamState();
  2593 void
  2594 SpdySession31::ConnectPushedStream(SpdyStream31 *stream)
  2596   mReadyForRead.Push(stream);
  2597   ForceRecv();
  2600 nsresult
  2601 SpdySession31::BufferOutput(const char *buf,
  2602                             uint32_t count,
  2603                             uint32_t *countRead)
  2605   nsAHttpSegmentReader *old = mSegmentReader;
  2606   mSegmentReader = nullptr;
  2607   nsresult rv = OnReadSegment(buf, count, countRead);
  2608   mSegmentReader = old;
  2609   return rv;
  2612 //-----------------------------------------------------------------------------
  2613 // Modified methods of nsAHttpConnection
  2614 //-----------------------------------------------------------------------------
  2616 void
  2617 SpdySession31::TransactionHasDataToWrite(nsAHttpTransaction *caller)
  2619   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2620   LOG3(("SpdySession31::TransactionHasDataToWrite %p trans=%p", this, caller));
  2622   // a trapped signal from the http transaction to the connection that
  2623   // it is no longer blocked on read.
  2625   SpdyStream31 *stream = mStreamTransactionHash.Get(caller);
  2626   if (!stream || !VerifyStream(stream)) {
  2627     LOG3(("SpdySession31::TransactionHasDataToWrite %p caller %p not found",
  2628           this, caller));
  2629     return;
  2632   LOG3(("SpdySession31::TransactionHasDataToWrite %p ID is 0x%X\n",
  2633         this, stream->StreamID()));
  2635   mReadyForWrite.Push(stream);
  2638 void
  2639 SpdySession31::TransactionHasDataToWrite(SpdyStream31 *stream)
  2641   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2642   LOG3(("SpdySession31::TransactionHasDataToWrite %p stream=%p ID=%x",
  2643         this, stream, stream->StreamID()));
  2645   mReadyForWrite.Push(stream);
  2646   SetWriteCallbacks();
  2649 bool
  2650 SpdySession31::IsPersistent()
  2652   return true;
  2655 nsresult
  2656 SpdySession31::TakeTransport(nsISocketTransport **,
  2657                              nsIAsyncInputStream **,
  2658                              nsIAsyncOutputStream **)
  2660   MOZ_ASSERT(false, "TakeTransport of SpdySession31");
  2661   return NS_ERROR_UNEXPECTED;
  2664 nsHttpConnection *
  2665 SpdySession31::TakeHttpConnection()
  2667   MOZ_ASSERT(false, "TakeHttpConnection of SpdySession31");
  2668   return nullptr;
  2671 uint32_t
  2672 SpdySession31::CancelPipeline(nsresult reason)
  2674   // we don't pipeline inside spdy, so this isn't an issue
  2675   return 0;
  2678 nsAHttpTransaction::Classifier
  2679 SpdySession31::Classification()
  2681   if (!mConnection)
  2682     return nsAHttpTransaction::CLASS_GENERAL;
  2683   return mConnection->Classification();
  2686 //-----------------------------------------------------------------------------
  2687 // unused methods of nsAHttpTransaction
  2688 // We can be sure of this because SpdySession31 is only constructed in
  2689 // nsHttpConnection and is never passed out of that object
  2690 //-----------------------------------------------------------------------------
  2692 void
  2693 SpdySession31::SetConnection(nsAHttpConnection *)
  2695   // This is unexpected
  2696   MOZ_ASSERT(false, "SpdySession31::SetConnection()");
  2699 void
  2700 SpdySession31::GetSecurityCallbacks(nsIInterfaceRequestor **)
  2702   // This is unexpected
  2703   MOZ_ASSERT(false, "SpdySession31::GetSecurityCallbacks()");
  2706 void
  2707 SpdySession31::SetProxyConnectFailed()
  2709   MOZ_ASSERT(false, "SpdySession31::SetProxyConnectFailed()");
  2712 bool
  2713 SpdySession31::IsDone()
  2715   return !mStreamTransactionHash.Count();
  2718 nsresult
  2719 SpdySession31::Status()
  2721   MOZ_ASSERT(false, "SpdySession31::Status()");
  2722   return NS_ERROR_UNEXPECTED;
  2725 uint32_t
  2726 SpdySession31::Caps()
  2728   MOZ_ASSERT(false, "SpdySession31::Caps()");
  2729   return 0;
  2732 void
  2733 SpdySession31::SetDNSWasRefreshed()
  2737 uint64_t
  2738 SpdySession31::Available()
  2740   MOZ_ASSERT(false, "SpdySession31::Available()");
  2741   return 0;
  2744 nsHttpRequestHead *
  2745 SpdySession31::RequestHead()
  2747   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2748   MOZ_ASSERT(false,
  2749              "SpdySession31::RequestHead() "
  2750              "should not be called after SPDY is setup");
  2751   return nullptr;
  2754 uint32_t
  2755 SpdySession31::Http1xTransactionCount()
  2757   return 0;
  2760 // used as an enumerator by TakeSubTransactions()
  2761 static PLDHashOperator
  2762   TakeStream(nsAHttpTransaction *key,
  2763              nsAutoPtr<SpdyStream31> &stream,
  2764              void *closure)
  2766   nsTArray<nsRefPtr<nsAHttpTransaction> > *list =
  2767     static_cast<nsTArray<nsRefPtr<nsAHttpTransaction> > *>(closure);
  2769   list->AppendElement(key);
  2771   // removing the stream from the hash will delete the stream
  2772   // and drop the transaction reference the hash held
  2773   return PL_DHASH_REMOVE;
  2776 nsresult
  2777 SpdySession31::TakeSubTransactions(
  2778   nsTArray<nsRefPtr<nsAHttpTransaction> > &outTransactions)
  2780   // Generally this cannot be done with spdy as transactions are
  2781   // started right away.
  2783   LOG3(("SpdySession31::TakeSubTransactions %p\n", this));
  2785   if (mConcurrentHighWater > 0)
  2786     return NS_ERROR_ALREADY_OPENED;
  2788   LOG3(("   taking %d\n", mStreamTransactionHash.Count()));
  2790   mStreamTransactionHash.Enumerate(TakeStream, &outTransactions);
  2791   return NS_OK;
  2794 nsresult
  2795 SpdySession31::AddTransaction(nsAHttpTransaction *)
  2797   // This API is meant for pipelining, SpdySession31's should be
  2798   // extended with AddStream()
  2800   MOZ_ASSERT(false,
  2801              "SpdySession31::AddTransaction() should not be called");
  2803   return NS_ERROR_NOT_IMPLEMENTED;
  2806 uint32_t
  2807 SpdySession31::PipelineDepth()
  2809   return IsDone() ? 0 : 1;
  2812 nsresult
  2813 SpdySession31::SetPipelinePosition(int32_t position)
  2815   // This API is meant for pipelining, SpdySession31's should be
  2816   // extended with AddStream()
  2818   MOZ_ASSERT(false,
  2819              "SpdySession31::SetPipelinePosition() should not be called");
  2821   return NS_ERROR_NOT_IMPLEMENTED;
  2824 int32_t
  2825 SpdySession31::PipelinePosition()
  2827   return 0;
  2830 //-----------------------------------------------------------------------------
  2831 // Pass through methods of nsAHttpConnection
  2832 //-----------------------------------------------------------------------------
  2834 nsAHttpConnection *
  2835 SpdySession31::Connection()
  2837   MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
  2838   return mConnection;
  2841 nsresult
  2842 SpdySession31::OnHeadersAvailable(nsAHttpTransaction *transaction,
  2843                                   nsHttpRequestHead *requestHead,
  2844                                   nsHttpResponseHead *responseHead,
  2845                                   bool *reset)
  2847   return mConnection->OnHeadersAvailable(transaction,
  2848                                          requestHead,
  2849                                          responseHead,
  2850                                          reset);
  2853 bool
  2854 SpdySession31::IsReused()
  2856   return mConnection->IsReused();
  2859 nsresult
  2860 SpdySession31::PushBack(const char *buf, uint32_t len)
  2862   return mConnection->PushBack(buf, len);
  2865 } // namespace mozilla::net
  2866 } // namespace mozilla

mercurial