diff --git a/fw/cache.c b/fw/cache.c index e24e4901b5..7f0be77e91 100644 --- a/fw/cache.c +++ b/fw/cache.c @@ -889,7 +889,7 @@ tfw_cache_send_304(TfwHttpReq *req, TfwCacheEntry *ce) if (tfw_h2_frame_local_resp(resp, stream_id, h_len, NULL)) goto err_setup; - tfw_h2_stream_id_unlink(req, false, true); + tfw_h2_stream_id_unlink(req, false, false); tfw_h2_resp_fwd(resp); return; @@ -2805,7 +2805,7 @@ cache_req_process_node(TfwHttpReq *req, tfw_http_cache_cb_t action) * the backend), thus the stream will be finished. */ if (resp && TFW_MSG_H2(req)) { - id = tfw_h2_stream_id_unlink(req, false, true); + id = tfw_h2_stream_id_unlink(req, false, false); if (unlikely(!id)) { tfw_http_msg_free((TfwHttpMsg *)resp); tfw_http_conn_msg_free((TfwHttpMsg *)req); diff --git a/fw/http.c b/fw/http.c index 01f94038d1..d2c8a51bcc 100644 --- a/fw/http.c +++ b/fw/http.c @@ -652,7 +652,7 @@ tfw_h2_prep_resp(TfwHttpResp *resp, unsigned short status, TfwStr *msg, r = tfw_h2_frame_local_resp(resp, stream_id, hdrs_len, body); out: - tfw_h2_stream_id_unlink(req, false, true); + tfw_h2_stream_id_unlink(req, false, r); return r; } @@ -5479,7 +5479,7 @@ tfw_h2_resp_adjust_fwd(TfwHttpResp *resp) req, resp); SS_SKB_QUEUE_DUMP(&resp->msg.skb_head); - tfw_h2_stream_id_unlink(req, false, true); + tfw_h2_stream_id_unlink(req, false, false); tfw_h2_resp_fwd(resp); __tfw_h2_resp_cleanup(&cleanup); diff --git a/fw/http_frame.c b/fw/http_frame.c index 1a879996d5..63dcb7e7fb 100644 --- a/fw/http_frame.c +++ b/fw/http_frame.c @@ -219,7 +219,8 @@ tfw_h2_cleanup(void) int tfw_h2_context_init(TfwH2Ctx *ctx) { - TfwClosedQueue *hclosed_streams = &ctx->hclosed_streams; + TfwStreamQueue *hclosed_streams = &ctx->hclosed_streams; + TfwStreamQueue *closed_streams = &ctx->closed_streams; TfwSettings *lset = &ctx->lsettings; TfwSettings *rset = &ctx->rsettings; @@ -229,6 +230,7 @@ tfw_h2_context_init(TfwH2Ctx *ctx) ctx->loc_wnd = MAX_WND_SIZE; spin_lock_init(&ctx->lock); INIT_LIST_HEAD(&hclosed_streams->list); + INIT_LIST_HEAD(&closed_streams->list); lset->hdr_tbl_sz = rset->hdr_tbl_sz = HPACK_TABLE_DEF_SIZE; lset->push = rset->push = 1; @@ -596,6 +598,56 @@ tfw_h2_stream_clean(TfwH2Ctx *ctx, TfwStream *stream) --ctx->streams_num; } +/* + * Del stream from queue. + * + * NOTE: call to this procedure should be protected by special lock for + * Stream linkage protection. + */ +static inline void +__tfw_h2_stream_del_tfw_stream_queue(TfwStream *stream) +{ + if(list_empty(&stream->hcl_node)) + return; + + BUG_ON(!stream->queue); + BUG_ON(!stream->queue->num); + + list_del_init(&stream->hcl_node); + --stream->queue->num; + stream->queue = NULL; +} + +/* + * Add stream to queue. + * + * NOTE: call to this procedure should be protected by special lock for + * Stream linkage protection. + */ +static inline void +__tfw_h2_stream_add_tfw_stream_queue(TfwStreamQueue *queue, TfwStream *stream) +{ + if (!list_empty(&stream->hcl_node)) + return; + + list_add_tail(&stream->hcl_node, &queue->list); + stream->queue = queue; + ++stream->queue->num; +} + +/* + * Move stream from one queue to another. + * + * NOTE: call to this procedure should be protected by special lock for + * Stream linkage protection. + */ +static inline void +__tfw_h2_stream_move_tfw_stream_queue(TfwStreamQueue *queue, TfwStream *stream) +{ + __tfw_h2_stream_del_tfw_stream_queue(stream); + __tfw_h2_stream_add_tfw_stream_queue(queue, stream); +} + /* * Unlink the stream from a corresponding request (if linked) and from special * queue of closed streams (if it is contained there). @@ -608,10 +660,7 @@ __tfw_h2_stream_unlink(TfwH2Ctx *ctx, TfwStream *stream) { TfwHttpMsg *hmreq = (TfwHttpMsg *)stream->msg; - if (!list_empty(&stream->hcl_node)) { - list_del_init(&stream->hcl_node); - --ctx->hclosed_streams.num; - } + __tfw_h2_stream_del_tfw_stream_queue(stream); if (hmreq) { hmreq->stream = NULL; @@ -673,28 +722,46 @@ tfw_h2_conn_streams_cleanup(TfwH2Ctx *ctx) sched->streams = RB_ROOT; } -/* - * Add stream to special queue of closed streams. - * - * NOTE: call to this procedure should be protected by special lock for - * Stream linkage protection. - */ static inline void -__tfw_h2_stream_add_closed(TfwClosedQueue *hclosed_streams, TfwStream *stream) +tfw_h2_stream_add_closed(TfwH2Ctx *ctx, TfwStream *stream) { - if (!list_empty(&stream->hcl_node)) - return; - - list_add_tail(&stream->hcl_node, &hclosed_streams->list); - ++hclosed_streams->num; + spin_lock(&ctx->lock); + __tfw_h2_stream_add_tfw_stream_queue(&ctx->closed_streams, stream); + spin_unlock(&ctx->lock); } -static inline void -tfw_h2_stream_add_closed(TfwH2Ctx *ctx, TfwStream *stream) +int +tfw_h2_stream_process(TfwH2Ctx *ctx, TfwStream *stream, unsigned char type) { + TfwStreamQueue *queue; + unsigned char flags = 0; + TfwStreamFsmRes r; + + if (!stream->xmit.h_len) { + queue = &ctx->hclosed_streams; + flags |= HTTP2_F_END_HEADERS; + } + if (!stream->xmit.b_len) { + queue = &ctx->closed_streams; + flags |= HTTP2_F_END_STREAM; + } + if (type == HTTP2_RST_STREAM) + queue = &ctx->closed_streams; + spin_lock(&ctx->lock); - __tfw_h2_stream_add_closed(&ctx->hclosed_streams, stream); + r = STREAM_SEND_PROCESS(stream, type, flags); + /* + * This function is called from `xmit` callback and + * there is a chance that the stream is already in + * closed state and `STREAM_SEND_PROCESS` return + * not STREAM_FSM_RES_OK. We should not drop + * connection or skb in this case. + */ + if (r != STREAM_FSM_RES_TERM_CONN) + __tfw_h2_stream_move_tfw_stream_queue(queue, stream); spin_unlock(&ctx->lock); + + return r != STREAM_FSM_RES_TERM_CONN ? 0 : -EINVAL; } /* @@ -772,7 +839,8 @@ tfw_h2_stream_id_unlink(TfwHttpReq *req, bool send_rst, bool move_to_closed) if (move_to_closed) { T_DBG3("%s: ctx [%p], strm [%p] added to closed streams list\n", __func__, ctx, stream); - __tfw_h2_stream_add_closed(&ctx->hclosed_streams, stream); + __tfw_h2_stream_add_tfw_stream_queue(&ctx->closed_streams, + stream); } spin_unlock(&ctx->lock); @@ -802,7 +870,8 @@ tfw_h2_stream_send_process(TfwHttpReq *req, unsigned char type, if (!id) { req->stream = NULL; stream->msg = NULL; - __tfw_h2_stream_add_closed(&ctx->hclosed_streams, stream); + __tfw_h2_stream_add_tfw_stream_queue(&ctx->closed_streams, + stream); } spin_unlock(&ctx->lock); @@ -820,24 +889,25 @@ tfw_h2_closed_streams_shrink(TfwH2Ctx *ctx) { TfwStream *cur; unsigned int max_streams = ctx->lsettings.max_streams; - TfwClosedQueue *hclosed_streams = &ctx->hclosed_streams; + TfwStreamQueue *closed_streams = &ctx->closed_streams; T_DBG3("%s: ctx [%p] max_streams %u\n", __func__, ctx, max_streams); while (1) { spin_lock(&ctx->lock); - if (hclosed_streams->num <= TFW_MAX_CLOSED_STREAMS + if (closed_streams->num <= TFW_MAX_CLOSED_STREAMS || (max_streams == ctx->streams_num - && hclosed_streams->num)) + && closed_streams->num)) { spin_unlock(&ctx->lock); break; } - BUG_ON(list_empty(&hclosed_streams->list)); - cur = list_first_entry(&hclosed_streams->list, TfwStream, + BUG_ON(list_empty(&closed_streams->list)); + cur = list_first_entry(&closed_streams->list, TfwStream, hcl_node); + BUG_ON(cur->xmit.h_len || cur->xmit.b_len); __tfw_h2_stream_unlink(ctx, cur); spin_unlock(&ctx->lock); @@ -1148,7 +1218,7 @@ tfw_h2_stream_id_verify(TfwH2Ctx *ctx) * CONTINUATION and DATA frames from this stream (not pass upstairs); * to achieve such behavior (to avoid removing of such closed streams * right away), streams in these states are temporary stored in special - * queue @TfwClosedQueue. + * queue @TfwStreamQueue. */ if (ctx->lstream_id >= hdr->stream_id) { T_DBG("Invalid ID of new stream: %u stream is" diff --git a/fw/http_frame.h b/fw/http_frame.h index e99c7dc8ee..059a6ab156 100644 --- a/fw/http_frame.h +++ b/fw/http_frame.h @@ -137,22 +137,6 @@ typedef struct { unsigned int max_lhdr_sz; } TfwSettings; -/** - * Limited queue for temporary storage of half-closed streams. This structure - * provides the possibility of temporary existing in memory - for streams which - * are in HTTP2_STREAM_LOC_CLOSED or HTTP2_STREAM_REM_CLOSED states (see RFC - * 7540, section 5.1, the 'closed' paragraph). Note, that streams in - * HTTP2_STREAM_CLOSED state are not stored in this queue and must be removed - * right away. - * - * @list - list of streams which are in closed state; - * @num - number of streams in the list; - */ -typedef struct { - struct list_head list; - unsigned long num; -} TfwClosedQueue; - /** * Context for HTTP/2 frames processing. * @@ -164,7 +148,11 @@ typedef struct { * @sched - streams' priority scheduler; * @hclosed_streams - queue of half-closed streams (in * HTTP2_STREAM_LOC_CLOSED or HTTP2_STREAM_REM_CLOSED - * states), which are waiting for removal; + * states), which are waiting until all it's data will + * be sent or error occured. Then they will be moved to + * @closed_streams queue for later removal; + * @closed_streams - queue of closed streams (in HTTP2_STREAM_CLOSED + * state), which are waiting for removal; * @lstream_id - ID of last stream initiated by client and processed on * the server side; * @loc_wnd - connection's current flow controlled window; @@ -203,7 +191,8 @@ typedef struct { TfwSettings rsettings; unsigned long streams_num; TfwStreamSched sched; - TfwClosedQueue hclosed_streams; + TfwStreamQueue hclosed_streams; + TfwStreamQueue closed_streams; unsigned int lstream_id; unsigned int loc_wnd; TfwHPack hpack; @@ -234,6 +223,8 @@ unsigned int tfw_h2_stream_send_process(TfwHttpReq *req, unsigned char type, unsigned char flags); unsigned int tfw_h2_stream_id_unlink(TfwHttpReq *req, bool send_rst, bool move_to_closed); +int tfw_h2_stream_process(TfwH2Ctx *ctx, TfwStream *stream, + unsigned char type); void tfw_h2_conn_terminate_close(TfwH2Ctx *ctx, TfwH2Err err_code, bool close); int tfw_h2_send_rst_stream(TfwH2Ctx *ctx, unsigned int id, TfwH2Err err_code); diff --git a/fw/http_stream.c b/fw/http_stream.c index d312b58c65..bdb5cdc8f2 100644 --- a/fw/http_stream.c +++ b/fw/http_stream.c @@ -79,7 +79,7 @@ tfw_h2_stream_fsm(TfwStream *stream, unsigned char type, unsigned char flags, * In the sending flow this FSM procedure intended only for * HEADERS, DATA and RST_STREAM frames processing. */ - BUG_ON(!(flags & HTTP2_F_END_STREAM) + BUG_ON(!(flags & (HTTP2_F_END_STREAM | HTTP2_F_END_HEADERS)) && type != HTTP2_RST_STREAM); /* * Usually we would send HEADERS/CONTINUATION or DATA frames @@ -234,10 +234,12 @@ tfw_h2_stream_fsm(TfwStream *stream, unsigned char type, unsigned char flags, break; case HTTP2_STREAM_REM_HALF_CLOSED: - if (send && (type == HTTP2_RST_STREAM - || flags & HTTP2_F_END_STREAM)) - { - stream->state = HTTP2_STREAM_REM_CLOSED; + if (send) { + if (type == HTTP2_RST_STREAM + || flags & HTTP2_F_END_STREAM) + stream->state = HTTP2_STREAM_REM_CLOSED; + else if (!(flags & HTTP2_F_END_HEADERS)) + BUG(); break; } /* diff --git a/fw/http_stream.h b/fw/http_stream.h index 75f7dca6ef..49e3e8922f 100644 --- a/fw/http_stream.h +++ b/fw/http_stream.h @@ -110,11 +110,28 @@ typedef struct { unsigned long b_len; } TfwHttpXmit; +/** + * Limited queue for temporary storage of half-closed or pending half-closed + * streams. + * This structure provides the possibility of temporary existing in memory - + * for streams which are in HTTP2_STREAM_LOC_CLOSED or HTTP2_STREAM_REM_CLOSED + * states (see RFC 7540, section 5.1, the 'closed' paragraph). Note, that + * streams in HTTP2_STREAM_CLOSED state are not stored in this queue and must + * be removed right away. + * + * @list - list of streams which are in closed state; + * @num - number of streams in the list; + */ +typedef struct { + struct list_head list; + unsigned long num; +} TfwStreamQueue; + /** * Representation of HTTP/2 stream entity. * * @node - entry in per-connection storage of streams (red-black tree); - * @hcl_node - entry in queue of half-closed streams; + * @hcl_node - entry in queue of half-closed or closed streams; * @id - stream ID; * @state - stream's current state; * @st_lock - spinlock to synchronize concurrent access to stream FSM; @@ -122,6 +139,7 @@ typedef struct { * @weight - stream's priority weight; * @msg - message that is currently being processed; * @parser - the state of message processing; + * @queue - queue of half-closed or closed streams or NULL; * @xmit - last http2 response info, used in `xmit` callbacks; */ struct tfw_http_stream_t { @@ -134,6 +152,7 @@ struct tfw_http_stream_t { unsigned short weight; TfwMsg *msg; TfwHttpParser parser; + TfwStreamQueue *queue; TfwHttpXmit xmit; }; diff --git a/fw/sock_clnt.c b/fw/sock_clnt.c index 45c8d039fb..76d83bd61b 100644 --- a/fw/sock_clnt.c +++ b/fw/sock_clnt.c @@ -289,8 +289,12 @@ tfw_h2_sk_prepare_xmit(struct sock *sk, struct sk_buff *skb, unsigned int mss_no * We clear this flag to prevent it's copying * during skb splitting. */ - if (headers_end) + if (headers_end) { skb_clear_tfw_flag(skb, SS_F_HTTT2_FRAME_HEADERS); + r = tfw_h2_stream_process(h2, stream, HTTP2_HEADERS); + if (unlikely(r)) + goto ret; + } } __ADJUST_T_TZ(); @@ -313,8 +317,12 @@ tfw_h2_sk_prepare_xmit(struct sock *sk, struct sk_buff *skb, unsigned int mss_no * We clear this flag to prevent it's copying * during skb splitting. */ - if (data_end) + if (data_end) { skb_clear_tfw_flag(skb, SS_F_HTTT2_FRAME_DATA); + r = tfw_h2_stream_process(h2, stream, HTTP2_DATA); + if (unlikely(r)) + goto ret; + } } __ADJUST_T_TZ(); @@ -341,13 +349,16 @@ tfw_h2_sk_prepare_xmit(struct sock *sk, struct sk_buff *skb, unsigned int mss_no r = r ? r : -ENOMEM; } - if (unlikely(r) && r != -ENOMEM) + if (unlikely(r) && r != -ENOMEM) { + if (stream) + tfw_h2_stream_process(h2, stream, HTTP2_RST_STREAM); /* * We can not send unencrypted data and can not normally close * the socket with FIN since we're in progress on sending from * the write queue. */ ss_close(sk, SS_F_ABORT); + } return r;