Skip to content

Commit

Permalink
Implement delayed stream deleting.
Browse files Browse the repository at this point in the history
Previously, when we forward response, we put appropriate
stream into `hclosed_streams` queue and if count of
such streams become greater then TFW_MAX_CLOSED_STREAMS
we delete streams from this queue. But now we should not
delete such streams, because they can used in `xmit`
callback, that's why we implement additional queue for
streams. Now when we forward response, we put appropriate
stream into `hclosed_streams` queue and then when all
response data will be sent, we move this stream to
`closed_streams` queue for further deleting.

Part of #1394
  • Loading branch information
EvgeniiMekhanik committed Apr 17, 2023
1 parent 29761f2 commit 4efdb06
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 59 deletions.
4 changes: 2 additions & 2 deletions fw/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ tfw_cache_send_304(TfwHttpReq *req, TfwCacheEntry *ce)
if (tfw_h2_frame_local_resp(resp, stream_id, h_len, NULL))
goto err_setup;

tfw_h2_stream_id_unlink(req, false, true);
tfw_h2_stream_id_unlink(req, false, false);
tfw_h2_resp_fwd(resp);

return;
Expand Down Expand Up @@ -2805,7 +2805,7 @@ cache_req_process_node(TfwHttpReq *req, tfw_http_cache_cb_t action)
* the backend), thus the stream will be finished.
*/
if (resp && TFW_MSG_H2(req)) {
id = tfw_h2_stream_id_unlink(req, false, true);
id = tfw_h2_stream_id_unlink(req, false, false);
if (unlikely(!id)) {
tfw_http_msg_free((TfwHttpMsg *)resp);
tfw_http_conn_msg_free((TfwHttpMsg *)req);
Expand Down
4 changes: 2 additions & 2 deletions fw/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ tfw_h2_prep_resp(TfwHttpResp *resp, unsigned short status, TfwStr *msg,
r = tfw_h2_frame_local_resp(resp, stream_id, hdrs_len, body);

out:
tfw_h2_stream_id_unlink(req, false, true);
tfw_h2_stream_id_unlink(req, false, r);
return r;
}

Expand Down Expand Up @@ -5479,7 +5479,7 @@ tfw_h2_resp_adjust_fwd(TfwHttpResp *resp)
req, resp);
SS_SKB_QUEUE_DUMP(&resp->msg.skb_head);

tfw_h2_stream_id_unlink(req, false, true);
tfw_h2_stream_id_unlink(req, false, false);
tfw_h2_resp_fwd(resp);

__tfw_h2_resp_cleanup(&cleanup);
Expand Down
126 changes: 98 additions & 28 deletions fw/http_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ tfw_h2_cleanup(void)
int
tfw_h2_context_init(TfwH2Ctx *ctx)
{
TfwClosedQueue *hclosed_streams = &ctx->hclosed_streams;
TfwStreamQueue *hclosed_streams = &ctx->hclosed_streams;
TfwStreamQueue *closed_streams = &ctx->closed_streams;
TfwSettings *lset = &ctx->lsettings;
TfwSettings *rset = &ctx->rsettings;

Expand All @@ -229,6 +230,7 @@ tfw_h2_context_init(TfwH2Ctx *ctx)
ctx->loc_wnd = MAX_WND_SIZE;
spin_lock_init(&ctx->lock);
INIT_LIST_HEAD(&hclosed_streams->list);
INIT_LIST_HEAD(&closed_streams->list);

lset->hdr_tbl_sz = rset->hdr_tbl_sz = HPACK_TABLE_DEF_SIZE;
lset->push = rset->push = 1;
Expand Down Expand Up @@ -596,6 +598,56 @@ tfw_h2_stream_clean(TfwH2Ctx *ctx, TfwStream *stream)
--ctx->streams_num;
}

/*
* Del stream from queue.
*
* NOTE: call to this procedure should be protected by special lock for
* Stream linkage protection.
*/
static inline void
__tfw_h2_stream_del_tfw_stream_queue(TfwStream *stream)
{
if(list_empty(&stream->hcl_node))
return;

BUG_ON(!stream->queue);
BUG_ON(!stream->queue->num);

list_del_init(&stream->hcl_node);
--stream->queue->num;
stream->queue = NULL;
}

/*
* Add stream to queue.
*
* NOTE: call to this procedure should be protected by special lock for
* Stream linkage protection.
*/
static inline void
__tfw_h2_stream_add_tfw_stream_queue(TfwStreamQueue *queue, TfwStream *stream)
{
if (!list_empty(&stream->hcl_node))
return;

list_add_tail(&stream->hcl_node, &queue->list);
stream->queue = queue;
++stream->queue->num;
}

/*
* Move stream from one queue to another.
*
* NOTE: call to this procedure should be protected by special lock for
* Stream linkage protection.
*/
static inline void
__tfw_h2_stream_move_tfw_stream_queue(TfwStreamQueue *queue, TfwStream *stream)
{
__tfw_h2_stream_del_tfw_stream_queue(stream);
__tfw_h2_stream_add_tfw_stream_queue(queue, stream);
}

/*
* Unlink the stream from a corresponding request (if linked) and from special
* queue of closed streams (if it is contained there).
Expand All @@ -608,10 +660,7 @@ __tfw_h2_stream_unlink(TfwH2Ctx *ctx, TfwStream *stream)
{
TfwHttpMsg *hmreq = (TfwHttpMsg *)stream->msg;

if (!list_empty(&stream->hcl_node)) {
list_del_init(&stream->hcl_node);
--ctx->hclosed_streams.num;
}
__tfw_h2_stream_del_tfw_stream_queue(stream);

if (hmreq) {
hmreq->stream = NULL;
Expand Down Expand Up @@ -673,28 +722,46 @@ tfw_h2_conn_streams_cleanup(TfwH2Ctx *ctx)
sched->streams = RB_ROOT;
}

/*
* Add stream to special queue of closed streams.
*
* NOTE: call to this procedure should be protected by special lock for
* Stream linkage protection.
*/
static inline void
__tfw_h2_stream_add_closed(TfwClosedQueue *hclosed_streams, TfwStream *stream)
tfw_h2_stream_add_closed(TfwH2Ctx *ctx, TfwStream *stream)
{
if (!list_empty(&stream->hcl_node))
return;

list_add_tail(&stream->hcl_node, &hclosed_streams->list);
++hclosed_streams->num;
spin_lock(&ctx->lock);
__tfw_h2_stream_add_tfw_stream_queue(&ctx->closed_streams, stream);
spin_unlock(&ctx->lock);
}

static inline void
tfw_h2_stream_add_closed(TfwH2Ctx *ctx, TfwStream *stream)
int
tfw_h2_stream_process(TfwH2Ctx *ctx, TfwStream *stream, unsigned char type)
{
TfwStreamQueue *queue;
unsigned char flags = 0;
TfwStreamFsmRes r;

if (!stream->xmit.h_len) {
queue = &ctx->hclosed_streams;
flags |= HTTP2_F_END_HEADERS;
}
if (!stream->xmit.b_len) {
queue = &ctx->closed_streams;
flags |= HTTP2_F_END_STREAM;
}
if (type == HTTP2_RST_STREAM)
queue = &ctx->closed_streams;

spin_lock(&ctx->lock);
__tfw_h2_stream_add_closed(&ctx->hclosed_streams, stream);
r = STREAM_SEND_PROCESS(stream, type, flags);
/*
* This function is called from `xmit` callback and
* there is a chance that the stream is already in
* closed state and `STREAM_SEND_PROCESS` return
* not STREAM_FSM_RES_OK. We should not drop
* connection or skb in this case.
*/
if (r != STREAM_FSM_RES_TERM_CONN)
__tfw_h2_stream_move_tfw_stream_queue(queue, stream);
spin_unlock(&ctx->lock);

return r != STREAM_FSM_RES_TERM_CONN ? 0 : -EINVAL;
}

/*
Expand Down Expand Up @@ -772,7 +839,8 @@ tfw_h2_stream_id_unlink(TfwHttpReq *req, bool send_rst, bool move_to_closed)
if (move_to_closed) {
T_DBG3("%s: ctx [%p], strm [%p] added to closed streams list\n",
__func__, ctx, stream);
__tfw_h2_stream_add_closed(&ctx->hclosed_streams, stream);
__tfw_h2_stream_add_tfw_stream_queue(&ctx->closed_streams,
stream);
}

spin_unlock(&ctx->lock);
Expand Down Expand Up @@ -802,7 +870,8 @@ tfw_h2_stream_send_process(TfwHttpReq *req, unsigned char type,
if (!id) {
req->stream = NULL;
stream->msg = NULL;
__tfw_h2_stream_add_closed(&ctx->hclosed_streams, stream);
__tfw_h2_stream_add_tfw_stream_queue(&ctx->closed_streams,
stream);
}

spin_unlock(&ctx->lock);
Expand All @@ -820,24 +889,25 @@ tfw_h2_closed_streams_shrink(TfwH2Ctx *ctx)
{
TfwStream *cur;
unsigned int max_streams = ctx->lsettings.max_streams;
TfwClosedQueue *hclosed_streams = &ctx->hclosed_streams;
TfwStreamQueue *closed_streams = &ctx->closed_streams;

T_DBG3("%s: ctx [%p] max_streams %u\n", __func__, ctx, max_streams);

while (1) {
spin_lock(&ctx->lock);

if (hclosed_streams->num <= TFW_MAX_CLOSED_STREAMS
if (closed_streams->num <= TFW_MAX_CLOSED_STREAMS
|| (max_streams == ctx->streams_num
&& hclosed_streams->num))
&& closed_streams->num))
{
spin_unlock(&ctx->lock);
break;
}

BUG_ON(list_empty(&hclosed_streams->list));
cur = list_first_entry(&hclosed_streams->list, TfwStream,
BUG_ON(list_empty(&closed_streams->list));
cur = list_first_entry(&closed_streams->list, TfwStream,
hcl_node);
BUG_ON(cur->xmit.h_len || cur->xmit.b_len);
__tfw_h2_stream_unlink(ctx, cur);

spin_unlock(&ctx->lock);
Expand Down Expand Up @@ -1148,7 +1218,7 @@ tfw_h2_stream_id_verify(TfwH2Ctx *ctx)
* CONTINUATION and DATA frames from this stream (not pass upstairs);
* to achieve such behavior (to avoid removing of such closed streams
* right away), streams in these states are temporary stored in special
* queue @TfwClosedQueue.
* queue @TfwStreamQueue.
*/
if (ctx->lstream_id >= hdr->stream_id) {
T_DBG("Invalid ID of new stream: %u stream is"
Expand Down
27 changes: 9 additions & 18 deletions fw/http_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,6 @@ typedef struct {
unsigned int max_lhdr_sz;
} TfwSettings;

/**
* Limited queue for temporary storage of half-closed streams. This structure
* provides the possibility of temporary existing in memory - for streams which
* are in HTTP2_STREAM_LOC_CLOSED or HTTP2_STREAM_REM_CLOSED states (see RFC
* 7540, section 5.1, the 'closed' paragraph). Note, that streams in
* HTTP2_STREAM_CLOSED state are not stored in this queue and must be removed
* right away.
*
* @list - list of streams which are in closed state;
* @num - number of streams in the list;
*/
typedef struct {
struct list_head list;
unsigned long num;
} TfwClosedQueue;

/**
* Context for HTTP/2 frames processing.
*
Expand All @@ -164,7 +148,11 @@ typedef struct {
* @sched - streams' priority scheduler;
* @hclosed_streams - queue of half-closed streams (in
* HTTP2_STREAM_LOC_CLOSED or HTTP2_STREAM_REM_CLOSED
* states), which are waiting for removal;
* states), which are waiting until all it's data will
* be sent or error occured. Then they will be moved to
* @closed_streams queue for later removal;
* @closed_streams - queue of closed streams (in HTTP2_STREAM_CLOSED
* state), which are waiting for removal;
* @lstream_id - ID of last stream initiated by client and processed on
* the server side;
* @loc_wnd - connection's current flow controlled window;
Expand Down Expand Up @@ -203,7 +191,8 @@ typedef struct {
TfwSettings rsettings;
unsigned long streams_num;
TfwStreamSched sched;
TfwClosedQueue hclosed_streams;
TfwStreamQueue hclosed_streams;
TfwStreamQueue closed_streams;
unsigned int lstream_id;
unsigned int loc_wnd;
TfwHPack hpack;
Expand Down Expand Up @@ -234,6 +223,8 @@ unsigned int tfw_h2_stream_send_process(TfwHttpReq *req, unsigned char type,
unsigned char flags);
unsigned int tfw_h2_stream_id_unlink(TfwHttpReq *req, bool send_rst,
bool move_to_closed);
int tfw_h2_stream_process(TfwH2Ctx *ctx, TfwStream *stream,
unsigned char type);
void tfw_h2_conn_terminate_close(TfwH2Ctx *ctx, TfwH2Err err_code, bool close);
int tfw_h2_send_rst_stream(TfwH2Ctx *ctx, unsigned int id, TfwH2Err err_code);

Expand Down
12 changes: 7 additions & 5 deletions fw/http_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ tfw_h2_stream_fsm(TfwStream *stream, unsigned char type, unsigned char flags,
* In the sending flow this FSM procedure intended only for
* HEADERS, DATA and RST_STREAM frames processing.
*/
BUG_ON(!(flags & HTTP2_F_END_STREAM)
BUG_ON(!(flags & (HTTP2_F_END_STREAM | HTTP2_F_END_HEADERS))
&& type != HTTP2_RST_STREAM);
/*
* Usually we would send HEADERS/CONTINUATION or DATA frames
Expand Down Expand Up @@ -234,10 +234,12 @@ tfw_h2_stream_fsm(TfwStream *stream, unsigned char type, unsigned char flags,
break;

case HTTP2_STREAM_REM_HALF_CLOSED:
if (send && (type == HTTP2_RST_STREAM
|| flags & HTTP2_F_END_STREAM))
{
stream->state = HTTP2_STREAM_REM_CLOSED;
if (send) {
if (type == HTTP2_RST_STREAM
|| flags & HTTP2_F_END_STREAM)
stream->state = HTTP2_STREAM_REM_CLOSED;
else if (!(flags & HTTP2_F_END_HEADERS))
BUG();
break;
}
/*
Expand Down
21 changes: 20 additions & 1 deletion fw/http_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,36 @@ typedef struct {
unsigned long b_len;
} TfwHttpXmit;

/**
* Limited queue for temporary storage of half-closed or pending half-closed
* streams.
* This structure provides the possibility of temporary existing in memory -
* for streams which are in HTTP2_STREAM_LOC_CLOSED or HTTP2_STREAM_REM_CLOSED
* states (see RFC 7540, section 5.1, the 'closed' paragraph). Note, that
* streams in HTTP2_STREAM_CLOSED state are not stored in this queue and must
* be removed right away.
*
* @list - list of streams which are in closed state;
* @num - number of streams in the list;
*/
typedef struct {
struct list_head list;
unsigned long num;
} TfwStreamQueue;

/**
* Representation of HTTP/2 stream entity.
*
* @node - entry in per-connection storage of streams (red-black tree);
* @hcl_node - entry in queue of half-closed streams;
* @hcl_node - entry in queue of half-closed or closed streams;
* @id - stream ID;
* @state - stream's current state;
* @st_lock - spinlock to synchronize concurrent access to stream FSM;
* @loc_wnd - stream's current flow controlled window;
* @weight - stream's priority weight;
* @msg - message that is currently being processed;
* @parser - the state of message processing;
* @queue - queue of half-closed or closed streams or NULL;
* @xmit - last http2 response info, used in `xmit` callbacks;
*/
struct tfw_http_stream_t {
Expand All @@ -134,6 +152,7 @@ struct tfw_http_stream_t {
unsigned short weight;
TfwMsg *msg;
TfwHttpParser parser;
TfwStreamQueue *queue;
TfwHttpXmit xmit;
};

Expand Down
Loading

0 comments on commit 4efdb06

Please sign in to comment.