diff --git a/src/tcpkali_connection.h b/src/tcpkali_connection.h index 7ad3b9a..0e0b58b 100644 --- a/src/tcpkali_connection.h +++ b/src/tcpkali_connection.h @@ -20,6 +20,7 @@ struct connection { tk_io watcher; tk_timer timer; + double timer_deadline; off_t write_offset; struct transport_data_spec data; non_atomic_traffic_stats traffic_ongoing; /* Connection-local numbers */ diff --git a/src/tcpkali_engine.c b/src/tcpkali_engine.c index 164c447..38a4f88 100644 --- a/src/tcpkali_engine.c +++ b/src/tcpkali_engine.c @@ -1474,6 +1474,7 @@ conn_timer_cb(TK_P_ tk_timer *w, int UNUSED revents) { struct connection *conn = (struct connection *)((char *)w - offsetof(struct connection, timer)); + conn->timer_deadline = 0; switch(conn->conn_state) { case CSTATE_CONNECTED: switch(conn->conn_type) { @@ -1568,8 +1569,6 @@ static void connection_timer_refresh(TK_P_ struct connection *conn, double delay) { struct loop_arguments *largs = tk_userdata(TK_A); - tk_timer_stop(TK_A, &conn->timer); - switch(conn->conn_state) { case CSTATE_CONNECTED: /* Use the supplied delay */ @@ -1579,7 +1578,21 @@ connection_timer_refresh(TK_P_ struct connection *conn, double delay) { break; } + double now = tk_now(TK_A); + + /* + * Do nothing if we already have a timer which + * is going to be fired sooner than the new one + */ + if(conn->timer_deadline > 0 && conn->timer_deadline < now + delay) { + return; + } + + tk_timer_stop(TK_A, &conn->timer); + conn->timer_deadline = 0; + if(delay > 0.0) { + conn->timer_deadline = now + delay; #ifdef USE_LIBUV uv_timer_init(TK_A_ & conn->timer); uint64_t uint_delay = 1000 * delay; @@ -1602,6 +1615,7 @@ common_connection_init(TK_P_ struct connection *conn, enum conn_type conn_type, conn->conn_type = conn_type; conn->conn_state = conn_state; + conn->timer_deadline = 0; maybe_enable_dump(largs, conn_type, sockfd); @@ -1936,10 +1950,11 @@ static enum lb_return_value { * So if the reading allowance is too small, make it * large enough still to batch reads. */ - if(allowed_to_move > 1460) + if(limit.bytes_per_second < allowed_to_move) { *suggested_move_size = allowed_to_move; - else - *suggested_move_size = 1460; + } else if(limit.bytes_per_second < *suggested_move_size) { + *suggested_move_size = limit.bytes_per_second; + } return LB_PROCEED; } } else { @@ -1960,6 +1975,7 @@ static enum lb_return_value { } if(delay < 0.001) delay = 0.001; + if(delay > 1) delay = 1; connection_timer_refresh(TK_A_ conn, delay); @@ -2504,6 +2520,7 @@ connection_cb(TK_P_ tk_io *w, int revents) { * If there's nothing to write, we remove the write interest. */ tk_timer_stop(TK_A, &conn->timer); + conn->timer_deadline = 0; if((conn->data.total_size == 0) && !(conn->conn_blocked & CBLOCKED_ON_WRITE)) { conn->conn_wish &= ~CW_WRITE_INTEREST; /* Remove write interest */ update_io_interest(TK_A_ conn);