michael@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: // HttpLog.h should generally be included first michael@0: #include "HttpLog.h" michael@0: michael@0: #include "nsHttpPipeline.h" michael@0: #include "nsHttpHandler.h" michael@0: #include "nsIOService.h" michael@0: #include "nsISocketTransport.h" michael@0: #include "nsIPipe.h" michael@0: #include "nsCOMPtr.h" michael@0: #include michael@0: #include "nsHttpRequestHead.h" michael@0: michael@0: #ifdef DEBUG michael@0: #include "prthread.h" michael@0: // defined by the socket transport service while active michael@0: extern PRThread *gSocketThread; michael@0: #endif michael@0: michael@0: namespace mozilla { michael@0: namespace net { michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsHttpPushBackWriter michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: class nsHttpPushBackWriter : public nsAHttpSegmentWriter michael@0: { michael@0: public: michael@0: nsHttpPushBackWriter(const char *buf, uint32_t bufLen) michael@0: : mBuf(buf) michael@0: , mBufLen(bufLen) michael@0: { } michael@0: virtual ~nsHttpPushBackWriter() {} michael@0: michael@0: nsresult OnWriteSegment(char *buf, uint32_t count, uint32_t *countWritten) michael@0: { michael@0: if (mBufLen == 0) michael@0: return NS_BASE_STREAM_CLOSED; michael@0: michael@0: if (count > mBufLen) michael@0: count = mBufLen; michael@0: michael@0: memcpy(buf, mBuf, count); michael@0: michael@0: mBuf += count; michael@0: mBufLen -= count; michael@0: *countWritten = count; michael@0: return NS_OK; michael@0: } michael@0: michael@0: private: michael@0: const char *mBuf; michael@0: uint32_t mBufLen; michael@0: }; michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsHttpPipeline michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsHttpPipeline::nsHttpPipeline() michael@0: : mConnection(nullptr) michael@0: , mStatus(NS_OK) michael@0: , mRequestIsPartial(false) michael@0: , mResponseIsPartial(false) michael@0: , mClosed(false) michael@0: , mUtilizedPipeline(false) michael@0: , mPushBackBuf(nullptr) michael@0: , mPushBackLen(0) michael@0: , mPushBackMax(0) michael@0: , mHttp1xTransactionCount(0) michael@0: , mReceivingFromProgress(0) michael@0: , mSendingToProgress(0) michael@0: , mSuppressSendEvents(true) michael@0: { michael@0: } michael@0: michael@0: nsHttpPipeline::~nsHttpPipeline() michael@0: { michael@0: // make sure we aren't still holding onto any transactions! michael@0: Close(NS_ERROR_ABORT); michael@0: michael@0: NS_IF_RELEASE(mConnection); michael@0: michael@0: if (mPushBackBuf) michael@0: free(mPushBackBuf); michael@0: } michael@0: michael@0: // Generate a shuffled request ordering sequence michael@0: void michael@0: nsHttpPipeline::ShuffleTransOrder(uint32_t count) michael@0: { michael@0: if (count < 2) michael@0: return; michael@0: michael@0: uint32_t pos = mRequestQ[0]->PipelinePosition(); michael@0: uint32_t i = 0; michael@0: michael@0: for (i=0; i < count; ++i) { michael@0: uint32_t ridx = rand() % count; michael@0: michael@0: nsAHttpTransaction *tmp = mRequestQ[i]; michael@0: mRequestQ[i] = mRequestQ[ridx]; michael@0: mRequestQ[ridx] = tmp; michael@0: } michael@0: michael@0: for (i=0; i < count; ++i) { michael@0: mRequestQ[i]->SetPipelinePosition(pos); michael@0: pos++; michael@0: } michael@0: michael@0: LOG(("nsHttpPipeline::ShuffleTransOrder: Shuffled %d transactions.\n", count)); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::AddTransaction(nsAHttpTransaction *trans) michael@0: { michael@0: LOG(("nsHttpPipeline::AddTransaction [this=%p trans=%x]\n", this, trans)); michael@0: michael@0: if (mRequestQ.Length() || mResponseQ.Length()) michael@0: mUtilizedPipeline = true; michael@0: michael@0: NS_ADDREF(trans); michael@0: mRequestQ.AppendElement(trans); michael@0: uint32_t qlen = PipelineDepth(); michael@0: michael@0: if (qlen != 1) { michael@0: trans->SetPipelinePosition(qlen); michael@0: } michael@0: else { michael@0: // do it for this case in case an idempotent cancellation michael@0: // is being repeated and an old value needs to be cleared michael@0: trans->SetPipelinePosition(0); michael@0: } michael@0: michael@0: // trans->SetConnection() needs to be updated to point back at michael@0: // the pipeline object. michael@0: trans->SetConnection(this); michael@0: michael@0: ShuffleTransOrder(mRequestQ.Length()); michael@0: michael@0: if (mConnection && !mClosed && mRequestQ.Length() == 1) michael@0: mConnection->ResumeSend(); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: uint32_t michael@0: nsHttpPipeline::PipelineDepth() michael@0: { michael@0: return mRequestQ.Length() + mResponseQ.Length(); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::SetPipelinePosition(int32_t position) michael@0: { michael@0: nsAHttpTransaction *trans = Response(0); michael@0: if (trans) michael@0: return trans->SetPipelinePosition(position); michael@0: return NS_OK; michael@0: } michael@0: michael@0: int32_t michael@0: nsHttpPipeline::PipelinePosition() michael@0: { michael@0: nsAHttpTransaction *trans = Response(0); michael@0: if (trans) michael@0: return trans->PipelinePosition(); michael@0: michael@0: // The response queue is empty, so return oldest request michael@0: if (mRequestQ.Length()) michael@0: return Request(mRequestQ.Length() - 1)->PipelinePosition(); michael@0: michael@0: // No transactions in the pipeline michael@0: return 0; michael@0: } michael@0: michael@0: nsHttpPipeline * michael@0: nsHttpPipeline::QueryPipeline() michael@0: { michael@0: return this; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsHttpPipeline::nsISupports michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: NS_IMPL_ADDREF(nsHttpPipeline) michael@0: NS_IMPL_RELEASE(nsHttpPipeline) michael@0: michael@0: // multiple inheritance fun :-) michael@0: NS_INTERFACE_MAP_BEGIN(nsHttpPipeline) michael@0: NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsAHttpConnection) michael@0: NS_INTERFACE_MAP_END michael@0: michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsHttpPipeline::nsAHttpConnection michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: nsresult michael@0: nsHttpPipeline::OnHeadersAvailable(nsAHttpTransaction *trans, michael@0: nsHttpRequestHead *requestHead, michael@0: nsHttpResponseHead *responseHead, michael@0: bool *reset) michael@0: { michael@0: LOG(("nsHttpPipeline::OnHeadersAvailable [this=%p]\n", this)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(mConnection, "no connection"); michael@0: michael@0: nsRefPtr ci; michael@0: GetConnectionInfo(getter_AddRefs(ci)); michael@0: MOZ_ASSERT(ci); michael@0: michael@0: bool pipeliningBefore = gHttpHandler->ConnMgr()->SupportsPipelining(ci); michael@0: michael@0: // trans has now received its response headers; forward to the real connection michael@0: nsresult rv = mConnection->OnHeadersAvailable(trans, michael@0: requestHead, michael@0: responseHead, michael@0: reset); michael@0: michael@0: if (!pipeliningBefore && gHttpHandler->ConnMgr()->SupportsPipelining(ci)) michael@0: // The received headers have expanded the eligible michael@0: // pipeline depth for this connection michael@0: gHttpHandler->ConnMgr()->ProcessPendingQForEntry(ci); michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::CloseTransaction(nsAHttpTransaction *trans, nsresult reason) michael@0: { michael@0: LOG(("nsHttpPipeline::CloseTransaction [this=%p trans=%x reason=%x]\n", michael@0: this, trans, reason)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(NS_FAILED(reason), "expecting failure code"); michael@0: michael@0: // the specified transaction is to be closed with the given "reason" michael@0: michael@0: int32_t index; michael@0: bool killPipeline = false; michael@0: michael@0: index = mRequestQ.IndexOf(trans); michael@0: if (index >= 0) { michael@0: if (index == 0 && mRequestIsPartial) { michael@0: // the transaction is in the request queue. check to see if any of michael@0: // its data has been written out yet. michael@0: killPipeline = true; michael@0: } michael@0: mRequestQ.RemoveElementAt(index); michael@0: } michael@0: else { michael@0: index = mResponseQ.IndexOf(trans); michael@0: if (index >= 0) michael@0: mResponseQ.RemoveElementAt(index); michael@0: // while we could avoid killing the pipeline if this transaction is the michael@0: // last transaction in the pipeline, there doesn't seem to be that much michael@0: // value in doing so. most likely if this transaction is going away, michael@0: // the others will be shortly as well. michael@0: killPipeline = true; michael@0: } michael@0: michael@0: // Marking this connection as non-reusable prevents other items from being michael@0: // added to it and causes it to be torn down soon. michael@0: DontReuse(); michael@0: michael@0: trans->Close(reason); michael@0: NS_RELEASE(trans); michael@0: michael@0: if (killPipeline) { michael@0: // reschedule anything from this pipeline onto a different connection michael@0: CancelPipeline(reason); michael@0: } michael@0: michael@0: // If all the transactions have been removed then we can close the connection michael@0: // right away. michael@0: if (!mRequestQ.Length() && !mResponseQ.Length() && mConnection) michael@0: mConnection->CloseTransaction(this, reason); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::TakeTransport(nsISocketTransport **aTransport, michael@0: nsIAsyncInputStream **aInputStream, michael@0: nsIAsyncOutputStream **aOutputStream) michael@0: { michael@0: return mConnection->TakeTransport(aTransport, aInputStream, aOutputStream); michael@0: } michael@0: michael@0: bool michael@0: nsHttpPipeline::IsPersistent() michael@0: { michael@0: return true; // pipelining requires this michael@0: } michael@0: michael@0: bool michael@0: nsHttpPipeline::IsReused() michael@0: { michael@0: if (!mUtilizedPipeline && mConnection) michael@0: return mConnection->IsReused(); michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::DontReuse() michael@0: { michael@0: if (mConnection) michael@0: mConnection->DontReuse(); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::PushBack(const char *data, uint32_t length) michael@0: { michael@0: LOG(("nsHttpPipeline::PushBack [this=%p len=%u]\n", this, length)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(mPushBackLen == 0, "push back buffer already has data!"); michael@0: michael@0: // If we have no chance for a pipeline (e.g. due to an Upgrade) michael@0: // then push this data down to original connection michael@0: if (!mConnection->IsPersistent()) michael@0: return mConnection->PushBack(data, length); michael@0: michael@0: // PushBack is called recursively from WriteSegments michael@0: michael@0: // XXX we have a design decision to make here. either we buffer the data michael@0: // and process it when we return to WriteSegments, or we attempt to move michael@0: // onto the next transaction from here. doing so adds complexity with the michael@0: // benefit of eliminating the extra buffer copy. the buffer is at most michael@0: // 4096 bytes, so it is really unclear if there is any value in the added michael@0: // complexity. besides simplicity, buffering this data has the advantage michael@0: // that we'll call close on the transaction sooner, which will wake up michael@0: // the HTTP channel sooner to continue with its work. michael@0: michael@0: if (!mPushBackBuf) { michael@0: mPushBackMax = length; michael@0: mPushBackBuf = (char *) malloc(mPushBackMax); michael@0: if (!mPushBackBuf) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: } michael@0: else if (length > mPushBackMax) { michael@0: // grow push back buffer as necessary. michael@0: MOZ_ASSERT(length <= nsIOService::gDefaultSegmentSize, "too big"); michael@0: mPushBackMax = length; michael@0: mPushBackBuf = (char *) realloc(mPushBackBuf, mPushBackMax); michael@0: if (!mPushBackBuf) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: } michael@0: michael@0: memcpy(mPushBackBuf, data, length); michael@0: mPushBackLen = length; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsHttpConnection * michael@0: nsHttpPipeline::TakeHttpConnection() michael@0: { michael@0: if (mConnection) michael@0: return mConnection->TakeHttpConnection(); michael@0: return nullptr; michael@0: } michael@0: michael@0: nsAHttpTransaction::Classifier michael@0: nsHttpPipeline::Classification() michael@0: { michael@0: if (mConnection) michael@0: return mConnection->Classification(); michael@0: michael@0: LOG(("nsHttpPipeline::Classification this=%p " michael@0: "has null mConnection using CLASS_SOLO default", this)); michael@0: return nsAHttpTransaction::CLASS_SOLO; michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::SetProxyConnectFailed() michael@0: { michael@0: nsAHttpTransaction *trans = Request(0); michael@0: michael@0: if (trans) michael@0: trans->SetProxyConnectFailed(); michael@0: } michael@0: michael@0: nsHttpRequestHead * michael@0: nsHttpPipeline::RequestHead() michael@0: { michael@0: nsAHttpTransaction *trans = Request(0); michael@0: michael@0: if (trans) michael@0: return trans->RequestHead(); michael@0: return nullptr; michael@0: } michael@0: michael@0: uint32_t michael@0: nsHttpPipeline::Http1xTransactionCount() michael@0: { michael@0: return mHttp1xTransactionCount; michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::TakeSubTransactions( michael@0: nsTArray > &outTransactions) michael@0: { michael@0: LOG(("nsHttpPipeline::TakeSubTransactions [this=%p]\n", this)); michael@0: michael@0: if (mResponseQ.Length() || mRequestIsPartial) michael@0: return NS_ERROR_ALREADY_OPENED; michael@0: michael@0: int32_t i, count = mRequestQ.Length(); michael@0: for (i = 0; i < count; ++i) { michael@0: nsAHttpTransaction *trans = Request(i); michael@0: // set the transaction conneciton object back to the underlying michael@0: // nsHttpConnectionHandle michael@0: trans->SetConnection(mConnection); michael@0: outTransactions.AppendElement(trans); michael@0: NS_RELEASE(trans); michael@0: } michael@0: mRequestQ.Clear(); michael@0: michael@0: LOG((" took %d\n", count)); michael@0: return NS_OK; michael@0: } michael@0: michael@0: //----------------------------------------------------------------------------- michael@0: // nsHttpPipeline::nsAHttpTransaction michael@0: //----------------------------------------------------------------------------- michael@0: michael@0: void michael@0: nsHttpPipeline::SetConnection(nsAHttpConnection *conn) michael@0: { michael@0: LOG(("nsHttpPipeline::SetConnection [this=%p conn=%x]\n", this, conn)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: MOZ_ASSERT(!mConnection, "already have a connection"); michael@0: michael@0: NS_IF_ADDREF(mConnection = conn); michael@0: } michael@0: michael@0: nsAHttpConnection * michael@0: nsHttpPipeline::Connection() michael@0: { michael@0: LOG(("nsHttpPipeline::Connection [this=%p conn=%x]\n", this, mConnection)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: return mConnection; michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::GetSecurityCallbacks(nsIInterfaceRequestor **result) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: // depending on timing this could be either the request or the response michael@0: // that is needed - but they both go to the same host. A request for these michael@0: // callbacks directly in nsHttpTransaction would not make a distinction michael@0: // over whether the the request had been transmitted yet. michael@0: nsAHttpTransaction *trans = Request(0); michael@0: if (!trans) michael@0: trans = Response(0); michael@0: if (trans) michael@0: trans->GetSecurityCallbacks(result); michael@0: else { michael@0: *result = nullptr; michael@0: } michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::OnTransportStatus(nsITransport* transport, michael@0: nsresult status, uint64_t progress) michael@0: { michael@0: LOG(("nsHttpPipeline::OnStatus [this=%p status=%x progress=%llu]\n", michael@0: this, status, progress)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: nsAHttpTransaction *trans; michael@0: int32_t i, count; michael@0: michael@0: switch (status) { michael@0: michael@0: case NS_NET_STATUS_RESOLVING_HOST: michael@0: case NS_NET_STATUS_RESOLVED_HOST: michael@0: case NS_NET_STATUS_CONNECTING_TO: michael@0: case NS_NET_STATUS_CONNECTED_TO: michael@0: // These should only appear at most once per pipeline. michael@0: // Deliver to the first transaction. michael@0: michael@0: trans = Request(0); michael@0: if (!trans) michael@0: trans = Response(0); michael@0: if (trans) michael@0: trans->OnTransportStatus(transport, status, progress); michael@0: michael@0: break; michael@0: michael@0: case NS_NET_STATUS_SENDING_TO: michael@0: // This is generated by the socket transport when (part) of michael@0: // a transaction is written out michael@0: // michael@0: // In pipelining this is generated out of FillSendBuf(), but it cannot do michael@0: // so until the connection is confirmed by CONNECTED_TO. michael@0: // See patch for bug 196827. michael@0: // michael@0: michael@0: if (mSuppressSendEvents) { michael@0: mSuppressSendEvents = false; michael@0: michael@0: // catch up by sending the event to all the transactions that have michael@0: // moved from request to response and any that have been partially michael@0: // sent. Also send WAITING_FOR to those that were completely sent michael@0: count = mResponseQ.Length(); michael@0: for (i = 0; i < count; ++i) { michael@0: Response(i)->OnTransportStatus(transport, michael@0: NS_NET_STATUS_SENDING_TO, michael@0: progress); michael@0: Response(i)->OnTransportStatus(transport, michael@0: NS_NET_STATUS_WAITING_FOR, michael@0: progress); michael@0: } michael@0: if (mRequestIsPartial && Request(0)) michael@0: Request(0)->OnTransportStatus(transport, michael@0: NS_NET_STATUS_SENDING_TO, michael@0: progress); michael@0: mSendingToProgress = progress; michael@0: } michael@0: // otherwise ignore it michael@0: break; michael@0: michael@0: case NS_NET_STATUS_WAITING_FOR: michael@0: // Created by nsHttpConnection when request pipeline has been totally michael@0: // sent. Ignore it here because it is simulated in FillSendBuf() when michael@0: // a request is moved from request to response. michael@0: michael@0: // ignore it michael@0: break; michael@0: michael@0: case NS_NET_STATUS_RECEIVING_FROM: michael@0: // Forward this only to the transaction currently recieving data. It is michael@0: // normally generated by the socket transport, but can also michael@0: // be repeated by the pushbackwriter if necessary. michael@0: mReceivingFromProgress = progress; michael@0: if (Response(0)) michael@0: Response(0)->OnTransportStatus(transport, status, progress); michael@0: break; michael@0: michael@0: default: michael@0: // forward other notifications to all request transactions michael@0: count = mRequestQ.Length(); michael@0: for (i = 0; i < count; ++i) michael@0: Request(i)->OnTransportStatus(transport, status, progress); michael@0: break; michael@0: } michael@0: } michael@0: michael@0: bool michael@0: nsHttpPipeline::IsDone() michael@0: { michael@0: bool done = true; michael@0: michael@0: uint32_t i, count = mRequestQ.Length(); michael@0: for (i = 0; done && (i < count); i++) michael@0: done = Request(i)->IsDone(); michael@0: michael@0: count = mResponseQ.Length(); michael@0: for (i = 0; done && (i < count); i++) michael@0: done = Response(i)->IsDone(); michael@0: michael@0: return done; michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::Status() michael@0: { michael@0: return mStatus; michael@0: } michael@0: michael@0: uint32_t michael@0: nsHttpPipeline::Caps() michael@0: { michael@0: nsAHttpTransaction *trans = Request(0); michael@0: if (!trans) michael@0: trans = Response(0); michael@0: michael@0: return trans ? trans->Caps() : 0; michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::SetDNSWasRefreshed() michael@0: { michael@0: nsAHttpTransaction *trans = Request(0); michael@0: if (!trans) michael@0: trans = Response(0); michael@0: michael@0: if (trans) michael@0: trans->SetDNSWasRefreshed(); michael@0: } michael@0: michael@0: uint64_t michael@0: nsHttpPipeline::Available() michael@0: { michael@0: uint64_t result = 0; michael@0: michael@0: int32_t i, count = mRequestQ.Length(); michael@0: for (i=0; iAvailable(); michael@0: return result; michael@0: } michael@0: michael@0: NS_METHOD michael@0: nsHttpPipeline::ReadFromPipe(nsIInputStream *stream, michael@0: void *closure, michael@0: const char *buf, michael@0: uint32_t offset, michael@0: uint32_t count, michael@0: uint32_t *countRead) michael@0: { michael@0: nsHttpPipeline *self = (nsHttpPipeline *) closure; michael@0: return self->mReader->OnReadSegment(buf, count, countRead); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::ReadSegments(nsAHttpSegmentReader *reader, michael@0: uint32_t count, michael@0: uint32_t *countRead) michael@0: { michael@0: LOG(("nsHttpPipeline::ReadSegments [this=%p count=%u]\n", this, count)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: if (mClosed) { michael@0: *countRead = 0; michael@0: return mStatus; michael@0: } michael@0: michael@0: nsresult rv; michael@0: uint64_t avail = 0; michael@0: if (mSendBufIn) { michael@0: rv = mSendBufIn->Available(&avail); michael@0: if (NS_FAILED(rv)) return rv; michael@0: } michael@0: michael@0: if (avail == 0) { michael@0: rv = FillSendBuf(); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: rv = mSendBufIn->Available(&avail); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: // return EOF if send buffer is empty michael@0: if (avail == 0) { michael@0: *countRead = 0; michael@0: return NS_OK; michael@0: } michael@0: } michael@0: michael@0: // read no more than what was requested michael@0: if (avail > count) michael@0: avail = count; michael@0: michael@0: mReader = reader; michael@0: michael@0: // avail is under 4GB, so casting to uint32_t is safe michael@0: rv = mSendBufIn->ReadSegments(ReadFromPipe, this, (uint32_t)avail, countRead); michael@0: michael@0: mReader = nullptr; michael@0: return rv; michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::WriteSegments(nsAHttpSegmentWriter *writer, michael@0: uint32_t count, michael@0: uint32_t *countWritten) michael@0: { michael@0: LOG(("nsHttpPipeline::WriteSegments [this=%p count=%u]\n", this, count)); michael@0: michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: if (mClosed) michael@0: return NS_SUCCEEDED(mStatus) ? NS_BASE_STREAM_CLOSED : mStatus; michael@0: michael@0: nsAHttpTransaction *trans; michael@0: nsresult rv; michael@0: michael@0: trans = Response(0); michael@0: // This code deals with the establishment of a CONNECT tunnel through michael@0: // an HTTP proxy. It allows the connection to do the CONNECT/200 michael@0: // HTTP transaction to establish a tunnel as a precursor to the michael@0: // actual pipeline of regular HTTP transactions. michael@0: if (!trans && mRequestQ.Length() && michael@0: mConnection->IsProxyConnectInProgress()) { michael@0: LOG(("nsHttpPipeline::WriteSegments [this=%p] Forced Delegation\n", michael@0: this)); michael@0: trans = Request(0); michael@0: } michael@0: michael@0: if (!trans) { michael@0: if (mRequestQ.Length() > 0) michael@0: rv = NS_BASE_STREAM_WOULD_BLOCK; michael@0: else michael@0: rv = NS_BASE_STREAM_CLOSED; michael@0: } michael@0: else { michael@0: // michael@0: // ask the transaction to consume data from the connection. michael@0: // PushBack may be called recursively. michael@0: // michael@0: rv = trans->WriteSegments(writer, count, countWritten); michael@0: michael@0: if (rv == NS_BASE_STREAM_CLOSED || trans->IsDone()) { michael@0: trans->Close(NS_OK); michael@0: michael@0: // Release the transaction if it is not IsProxyConnectInProgress() michael@0: if (trans == Response(0)) { michael@0: NS_RELEASE(trans); michael@0: mResponseQ.RemoveElementAt(0); michael@0: mResponseIsPartial = false; michael@0: ++mHttp1xTransactionCount; michael@0: } michael@0: michael@0: // ask the connection manager to add additional transactions michael@0: // to our pipeline. michael@0: nsRefPtr ci; michael@0: GetConnectionInfo(getter_AddRefs(ci)); michael@0: if (ci) michael@0: gHttpHandler->ConnMgr()->ProcessPendingQForEntry(ci); michael@0: } michael@0: else michael@0: mResponseIsPartial = true; michael@0: } michael@0: michael@0: if (mPushBackLen) { michael@0: nsHttpPushBackWriter writer(mPushBackBuf, mPushBackLen); michael@0: uint32_t len = mPushBackLen, n; michael@0: mPushBackLen = 0; michael@0: michael@0: // This progress notification has previously been sent from michael@0: // the socket transport code, but it was delivered to the michael@0: // previous transaction on the pipeline. michael@0: nsITransport *transport = Transport(); michael@0: if (transport) michael@0: OnTransportStatus(transport, NS_NET_STATUS_RECEIVING_FROM, michael@0: mReceivingFromProgress); michael@0: michael@0: // the push back buffer is never larger than NS_HTTP_SEGMENT_SIZE, michael@0: // so we are guaranteed that the next response will eat the entire michael@0: // push back buffer (even though it might again call PushBack). michael@0: rv = WriteSegments(&writer, len, &n); michael@0: } michael@0: michael@0: return rv; michael@0: } michael@0: michael@0: uint32_t michael@0: nsHttpPipeline::CancelPipeline(nsresult originalReason) michael@0: { michael@0: uint32_t i, reqLen, respLen, total; michael@0: nsAHttpTransaction *trans; michael@0: michael@0: reqLen = mRequestQ.Length(); michael@0: respLen = mResponseQ.Length(); michael@0: total = reqLen + respLen; michael@0: michael@0: // don't count the first response, if presnet michael@0: if (respLen) michael@0: total--; michael@0: michael@0: if (!total) michael@0: return 0; michael@0: michael@0: // any pending requests can ignore this error and be restarted michael@0: // unless it is during a CONNECT tunnel request michael@0: for (i = 0; i < reqLen; ++i) { michael@0: trans = Request(i); michael@0: if (mConnection && mConnection->IsProxyConnectInProgress()) michael@0: trans->Close(originalReason); michael@0: else michael@0: trans->Close(NS_ERROR_NET_RESET); michael@0: NS_RELEASE(trans); michael@0: } michael@0: mRequestQ.Clear(); michael@0: michael@0: // any pending responses can be restarted except for the first one, michael@0: // that we might want to finish on this pipeline or cancel individually. michael@0: // Higher levels of callers ensure that we don't process non-idempotent michael@0: // tranasction with the NS_HTTP_ALLOW_PIPELINING bit set michael@0: for (i = 1; i < respLen; ++i) { michael@0: trans = Response(i); michael@0: trans->Close(NS_ERROR_NET_RESET); michael@0: NS_RELEASE(trans); michael@0: } michael@0: michael@0: if (respLen > 1) michael@0: mResponseQ.TruncateLength(1); michael@0: michael@0: /* Don't flag timed out connections as unreusable.. Tor is just slow :( */ michael@0: if (originalReason != NS_ERROR_NET_TIMEOUT) { michael@0: DontReuse(); michael@0: Classify(nsAHttpTransaction::CLASS_SOLO); michael@0: } michael@0: michael@0: return total; michael@0: } michael@0: michael@0: void michael@0: nsHttpPipeline::Close(nsresult reason) michael@0: { michael@0: LOG(("nsHttpPipeline::Close [this=%p reason=%x]\n", this, reason)); michael@0: michael@0: if (mClosed) { michael@0: LOG((" already closed\n")); michael@0: return; michael@0: } michael@0: michael@0: // the connection is going away! michael@0: mStatus = reason; michael@0: mClosed = true; michael@0: michael@0: nsRefPtr ci; michael@0: GetConnectionInfo(getter_AddRefs(ci)); michael@0: uint32_t numRescheduled = CancelPipeline(reason); michael@0: michael@0: // numRescheduled can be 0 if there is just a single response in the michael@0: // pipeline object. That isn't really a meaningful pipeline that michael@0: // has been forced to be rescheduled so it does not need to generate michael@0: // negative feedback. michael@0: if (ci && numRescheduled) michael@0: gHttpHandler->ConnMgr()->PipelineFeedbackInfo( michael@0: ci, nsHttpConnectionMgr::RedCanceledPipeline, nullptr, 0); michael@0: michael@0: nsAHttpTransaction *trans = Response(0); michael@0: if (!trans) michael@0: return; michael@0: michael@0: // The current transaction can be restarted via reset michael@0: // if the response has not started to arrive and the reason michael@0: // for failure is innocuous (e.g. not an SSL error) michael@0: if (!mResponseIsPartial && michael@0: (reason == NS_ERROR_NET_RESET || michael@0: reason == NS_OK || michael@0: reason == NS_ERROR_NET_TIMEOUT || michael@0: reason == NS_BASE_STREAM_CLOSED)) { michael@0: trans->Close(NS_ERROR_NET_RESET); michael@0: } michael@0: else { michael@0: trans->Close(reason); michael@0: } michael@0: michael@0: NS_RELEASE(trans); michael@0: mResponseQ.Clear(); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::OnReadSegment(const char *segment, michael@0: uint32_t count, michael@0: uint32_t *countRead) michael@0: { michael@0: return mSendBufOut->Write(segment, count, countRead); michael@0: } michael@0: michael@0: nsresult michael@0: nsHttpPipeline::FillSendBuf() michael@0: { michael@0: // reads from request queue, moving transactions to response queue michael@0: // when they have been completely read. michael@0: michael@0: nsresult rv; michael@0: michael@0: if (!mSendBufIn) { michael@0: // allocate a single-segment pipe michael@0: rv = NS_NewPipe(getter_AddRefs(mSendBufIn), michael@0: getter_AddRefs(mSendBufOut), michael@0: nsIOService::gDefaultSegmentSize, /* segment size */ michael@0: nsIOService::gDefaultSegmentSize, /* max size */ michael@0: true, true); michael@0: if (NS_FAILED(rv)) return rv; michael@0: } michael@0: michael@0: uint32_t n; michael@0: uint64_t avail; michael@0: uint64_t totalSent = 0; michael@0: uint64_t reqsSent = 0; michael@0: uint64_t alreadyPending = 0; michael@0: michael@0: mSendBufIn->Available(&alreadyPending); michael@0: michael@0: nsAHttpTransaction *trans; michael@0: nsITransport *transport = Transport(); michael@0: #ifdef WTF_TEST michael@0: uint64_t totalAvailable = Available(); michael@0: nsRefPtr ci; michael@0: GetConnectionInfo(getter_AddRefs(ci)); michael@0: #endif michael@0: michael@0: while ((trans = Request(0)) != nullptr) { michael@0: avail = trans->Available(); michael@0: if (avail) { michael@0: // if there is already a response in the responseq then this michael@0: // new data comprises a pipeline. Update the transaction in the michael@0: // response queue to reflect that if necessary. We are now sending michael@0: // out a request while we haven't received all responses. michael@0: nsAHttpTransaction *response = Response(0); michael@0: if (response && !response->PipelinePosition()) michael@0: response->SetPipelinePosition(1); michael@0: rv = trans->ReadSegments(this, (uint32_t)std::min(avail, (uint64_t)UINT32_MAX), &n); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: if (n == 0) { michael@0: LOG(("send pipe is full")); michael@0: break; michael@0: } michael@0: michael@0: mSendingToProgress += n; michael@0: totalSent += n; michael@0: if (!mSuppressSendEvents && transport) { michael@0: // Simulate a SENDING_TO event michael@0: trans->OnTransportStatus(transport, michael@0: NS_NET_STATUS_SENDING_TO, michael@0: mSendingToProgress); michael@0: } michael@0: } michael@0: michael@0: avail = trans->Available(); michael@0: if (avail == 0) { michael@0: #ifdef WTF_TEST michael@0: nsHttpRequestHead *head = trans->RequestHead(); michael@0: fprintf(stderr, "WTF-order: Pipelined req %d/%d (%dB). Url: %s%s\n", michael@0: trans->PipelinePosition(), PipelineDepth(), n, michael@0: ci->Host(), head ? head->RequestURI().BeginReading() : ""); michael@0: #endif michael@0: reqsSent++; michael@0: michael@0: // move transaction from request queue to response queue michael@0: mRequestQ.RemoveElementAt(0); michael@0: mResponseQ.AppendElement(trans); michael@0: mRequestIsPartial = false; michael@0: michael@0: if (!mSuppressSendEvents && transport) { michael@0: // Simulate a WAITING_FOR event michael@0: trans->OnTransportStatus(transport, michael@0: NS_NET_STATUS_WAITING_FOR, michael@0: mSendingToProgress); michael@0: } michael@0: michael@0: // It would be good to re-enable data read handlers via ResumeRecv() michael@0: // except the read handler code can be synchronously dispatched on michael@0: // the stack. michael@0: } michael@0: else michael@0: mRequestIsPartial = true; michael@0: } michael@0: michael@0: #ifdef WTF_TEST michael@0: if (totalSent) michael@0: fprintf(stderr, "WTF-combine: Sent %ld/%ld bytes of %ld combined pipelined requests for host %s\n", michael@0: alreadyPending+totalSent, totalAvailable, reqsSent, ci->Host()); michael@0: #endif michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: } // namespace mozilla::net michael@0: } // namespace mozilla