diff --git a/src/Pipeline.cc b/src/Pipeline.cc index 7d1429cec1c..03add1e93ca 100644 --- a/src/Pipeline.cc +++ b/src/Pipeline.cc @@ -19,6 +19,9 @@ void Pipeline::add(const Http::StreamPointer &c) { + Assure(!mayUseConnection()); + Assure(!poppedBusy); + c->mayUseConnection(true); requests.push_back(c); ++nrequests; debugs(33, 3, "Pipeline " << (void*)this << " add request " << nrequests << ' ' << c); @@ -48,6 +51,12 @@ Pipeline::back() const return requests.back(); } +bool +Pipeline::mayUseConnection() const +{ + return requests.empty() ? false : requests.front()->mayUseConnection(); +} + void Pipeline::popMe(const Http::StreamPointer &which) { @@ -58,6 +67,10 @@ Pipeline::popMe(const Http::StreamPointer &which) // in reality there may be multiple contexts doing processing in parallel. // XXX: pipeline still assumes HTTP/1 FIFO semantics are obeyed. assert(which == requests.front()); + if (which->mayUseConnection()) { + Assure(requests.size() == 1); + poppedBusy = true; + } requests.pop_front(); } diff --git a/src/Pipeline.h b/src/Pipeline.h index 54a9e157079..983f02d47e6 100644 --- a/src/Pipeline.h +++ b/src/Pipeline.h @@ -58,10 +58,16 @@ class Pipeline /// deregister the front request from the pipeline void popMe(const Http::StreamPointer &); + /// whether some of the queued transactions use the connection + bool mayUseConnection() const; + /// Number of requests seen in this pipeline (so far). /// Includes incomplete transactions. uint32_t nrequests; + /// whether popMe() was called on a transaction with mayUseConnection() + bool poppedBusy = false; + private: /// requests parsed from the connection but not yet completed. std::list requests; diff --git a/src/client_side.cc b/src/client_side.cc index fe45fc97497..b6f869adcea 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -1635,10 +1635,8 @@ clientProcessRequest(ConnStateData *conn, const Http1::RequestParserPointer &hp, clientSetKeepaliveFlag(http); // Let tunneling code be fully responsible for CONNECT requests - if (http->request->method == Http::METHOD_CONNECT) { - context->mayUseConnection(true); + if (http->request->method == Http::METHOD_CONNECT) conn->flags.readMore = false; - } #if USE_OPENSSL if (conn->switchedToHttps() && conn->serveDelayedError(context)) { @@ -1650,7 +1648,9 @@ clientProcessRequest(ConnStateData *conn, const Http1::RequestParserPointer &hp, /* Do we expect a request-body? */ const auto chunked = request->header.chunked(); expectBody = chunked || request->content_length > 0; - if (!context->mayUseConnection() && expectBody) { + if (http->request->method != Http::METHOD_CONNECT && !expectBody) // TODO: diff reducer + context->mayUseConnection(false); + if (http->request->method != Http::METHOD_CONNECT && expectBody) { request->body_pipe = conn->expectRequestBody( chunked ? -1 : request->content_length); @@ -1680,7 +1680,6 @@ clientProcessRequest(ConnStateData *conn, const Http1::RequestParserPointer &hp, if (!request->body_pipe->productionEnded()) { debugs(33, 5, "need more request body"); - context->mayUseConnection(true); assert(conn->flags.readMore); } } @@ -1962,6 +1961,8 @@ ConnStateData::handleRequestBodyData() consumeInput(putSize); if (!bodyPipe->mayNeedMoreData()) { + if (auto context = pipeline.front()) + context->mayUseConnection(false); // BodyPipe will clear us automagically when we produced everything bodyPipe = nullptr; } @@ -3173,7 +3174,6 @@ ConnStateData::buildFakeRequest(SBuf &useHost, const AnyP::KnownPort usePort, co clientSocketDetach, newClient, tempBuffer); stream->flags.parsed_ok = 1; // Do we need it? - stream->mayUseConnection(true); extendLifetime(); stream->registerWithConn(); @@ -3594,9 +3594,11 @@ ConnStateData::finishDechunkingRequest(bool withSuccess) Must(!bodyPipe); // we rely on it being nil after we are done with body if (withSuccess) { Must(myPipe->bodySizeKnown()); - Http::StreamPointer context = pipeline.front(); - if (context != nullptr && context->http && context->http->request) - context->http->request->setContentLength(myPipe->bodySize()); + if (auto context = pipeline.front()) { + context->mayUseConnection(false); + if (context->http && context->http->request) + context->http->request->setContentLength(myPipe->bodySize()); + } } } @@ -3914,30 +3916,28 @@ ConnStateData::terminateAll(const Error &rawError, const LogTagsErrors <e) debugs(33, 3, pipeline.count() << '/' << pipeline.nrequests << " after " << error); - if (pipeline.empty()) { + if (pipeline.empty()) bareError.update(error); // XXX: bareLogTagsErrors - } else { - // We terminate the current CONNECT/PUT/etc. context below, logging any - // error details, but that context may leave unparsed bytes behind. - // Consume them to stop checkLogging() from logging them again later. - const auto intputToConsume = -#if USE_OPENSSL - parsingTlsHandshake ? "TLS handshake" : // more specific than CONNECT -#endif - bodyPipe ? "HTTP request body" : - pipeline.back()->mayUseConnection() ? "HTTP CONNECT" : - nullptr; + // We terminate the current CONNECT/PUT/etc. context below, logging any + // error details, but that context may leave unparsed bytes behind. + // Consume them to stop checkLogging() from logging them again later. + const auto intputToConsume = + (pipeline.poppedBusy && !inBuf.isEmpty()) ? "extra connection bytes" : + (!pipeline.empty() && pipeline.mayUseConnection()) ? "still using connection" : + nullptr; + + if (!pipeline.empty()) { while (const auto context = pipeline.front()) { context->noteIoError(error, lte); context->finished(); // cleanup and self-deregister assert(context != pipeline.front()); } + } - if (intputToConsume && !inBuf.isEmpty()) { - debugs(83, 5, "forgetting client " << intputToConsume << " bytes: " << inBuf.length()); - inBuf.clear(); - } + if (intputToConsume && !inBuf.isEmpty()) { + debugs(83, 5, "forgetting client " << intputToConsume << " bytes: " << inBuf.length()); + inBuf.clear(); } clientConnection->close();