Skip to content

Commit

Permalink
add variable to record the total size on one websocket connection
Browse files Browse the repository at this point in the history
  • Loading branch information
qdongxu committed Aug 24, 2023
1 parent 22d57d3 commit 0b0f120
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Here is a list of variables you can use in log format string:

* $ws_opcode - websocket packet opcode. Look into https://tools.ietf.org/html/rfc6455 Section 5.2, Base Framing Protocol.
* $ws_payload_size - Websocket packet size without protocol specific data. Only data that been sent or received by the client
* $ws_total_payload_size - total packet size on a Websocket connection, without protocol specific data. Only data that been sent or received by the client
* $ws_packet_source - Could be "client" if packet has been sent by the user or "upstream" if it has been received from the server
* $ws_conn_age - Number of seconds connection is alive
* $time_local - Nginx local time, date and timezone
Expand Down
74 changes: 53 additions & 21 deletions ngx_http_websocket_stat_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ websocket_log(char *str)
void
ws_do_log(compiled_template *template, ngx_http_request_t *r, void *ctx)
{
if (ws_log) {
if (ws_log && template) {
char *log_line = apply_template(template, r, ctx);
websocket_log(log_line);
free(log_line);
Expand Down Expand Up @@ -330,6 +330,7 @@ my_send(ngx_connection_t *c, u_char *buf, size_t size)
ngx_atomic_fetch_add(frame_counter->frames, 1);
ngx_atomic_fetch_add(frame_counter->total_payload_size,
ctx->frame_counter.current_payload_size);
ctx->frame_counter.total_payload_size += ctx->frame_counter.current_payload_size;
ws_do_log(log_template, r, &template_ctx);
}
}
Expand Down Expand Up @@ -371,6 +372,7 @@ my_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_atomic_fetch_add(frame_counter->frames, 1);
ngx_atomic_fetch_add(frame_counter->total_payload_size,
ctx->frame_counter.current_payload_size);
ctx->frame_counter.total_payload_size += ctx->frame_counter.current_payload_size;
ws_do_log(log_template, r, &template_ctx);
}
}
Expand Down Expand Up @@ -408,6 +410,7 @@ ngx_http_websocket_stat_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
ctx->connection_id.data = ngx_pcalloc(r->pool, UID_LENGTH + 1);
ctx->connection_id.len = UID_LENGTH;
memcpy(ctx->connection_id.data, request_id_str, UID_LENGTH + 1);
ctx->frame_counter.total_payload_size = 0;

ws_do_log(log_open_template, r, &template_ctx);
ngx_http_set_ctx(r, ctx, ngx_http_websocket_stat_module);
Expand All @@ -428,16 +431,16 @@ ngx_http_websocket_stat_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
return ngx_http_next_body_filter(r, in);
}

char buff[100];

const char *
ws_packet_type(ngx_http_request_t *r, void *data)
{
template_ctx_s *ctx = data;
ngx_frame_counter_t *frame_cntr = &(ctx->ws_ctx->frame_counter);
if (!ctx || !frame_cntr)
return UNKNOWN_VAR;
sprintf(buff, "%d", frame_cntr->current_frame_type);

char* buff = ngx_pcalloc(r->pool, NGX_ATOMIC_T_LEN);
snprintf(buff, NGX_ATOMIC_T_LEN, "%d", frame_cntr->current_frame_type);
return buff;
}

Expand All @@ -448,7 +451,22 @@ ws_packet_size(ngx_http_request_t *r, void *data)
ngx_frame_counter_t *frame_cntr = &ctx->ws_ctx->frame_counter;
if (!ctx || !frame_cntr)
return UNKNOWN_VAR;
sprintf(buff, "%lu", frame_cntr->current_payload_size);

char* buff = ngx_pcalloc(r->pool, NGX_ATOMIC_T_LEN);
snprintf(buff, NGX_ATOMIC_T_LEN, "%lu", frame_cntr->current_payload_size);
return (char *)buff;
}

const char *
ws_total_payload_size(ngx_http_request_t *r, void *data)
{
template_ctx_s *ctx = data;
ngx_frame_counter_t *frame_cntr = &ctx->ws_ctx->frame_counter;
if (!ctx || !frame_cntr)
return UNKNOWN_VAR;

char* buff = ngx_pcalloc(r->pool, NGX_ATOMIC_T_LEN);
snprintf(buff, NGX_ATOMIC_T_LEN, "%lu", frame_cntr->total_payload_size);
return (char *)buff;
}

Expand All @@ -475,6 +493,7 @@ get_core_var(ngx_http_request_t *r, const char *variable)
key = ngx_hash(key, *(variable++));

vv = ngx_http_get_variable(r, &var, key);
char* buff = ngx_pcalloc(r->pool, vv->len + 1);
memcpy(buff, vv->data, vv->len);
buff[vv->len] = '\0';
return buff;
Expand All @@ -486,20 +505,26 @@ ws_connection_age(ngx_http_request_t *r, void *data)
template_ctx_s *ctx = data;
if (!ctx || !ctx->ws_ctx)
return UNKNOWN_VAR;
sprintf(buff, "%lu", ngx_time() - ctx->ws_ctx->ws_conn_start_time);

char* buff = ngx_pcalloc(r->pool, NGX_ATOMIC_T_LEN);
snprintf(buff, NGX_ATOMIC_T_LEN, "%lu", ngx_time() - ctx->ws_ctx->ws_conn_start_time);

return (char *)buff;
}

const char *
local_time(ngx_http_request_t *r, void *data)
{
return memcpy(buff, ngx_cached_http_time.data, ngx_cached_http_time.len);
char* buff = ngx_pcalloc(r->pool, ngx_cached_http_time.len + 1);
memcpy(buff, ngx_cached_http_time.data, ngx_cached_http_time.len);
buff[ngx_cached_http_time.len] = '\0';
return buff;
}

const char *
remote_ip(ngx_http_request_t *r, void *data)
{
char* buff = ngx_pcalloc(r->pool, r->connection->addr_text.len + 1);
memcpy(buff, r->connection->addr_text.data, r->connection->addr_text.len);
buff[r->connection->addr_text.len] = '\0';

Expand Down Expand Up @@ -545,6 +570,7 @@ GEN_CORE_GET_FUNC(server_port, "server_port")
const template_variable variables[] = {
{VAR_NAME("$ws_opcode"), sizeof("ping") - 1, ws_packet_type},
{VAR_NAME("$ws_payload_size"), NGX_SIZE_T_LEN, ws_packet_size},
{VAR_NAME("$ws_total_payload_size"), NGX_SIZE_T_LEN, ws_total_payload_size},
{VAR_NAME("$ws_packet_source"), sizeof("upstream") - 1, ws_packet_source},
{VAR_NAME("$ws_conn_age"), NGX_SIZE_T_LEN, ws_connection_age},
{VAR_NAME("$time_local"), sizeof("Mon, 23 Oct 2017 11:27:42 GMT") - 1,
Expand Down Expand Up @@ -585,12 +611,18 @@ ngx_http_ws_log_format(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "Wrong argument number");
return NGX_CONF_ERROR;
}

if (cf->args->nelts == 2) {
log_template =
compile_template((char *)args[1].data, variables, cf->pool);
return NGX_CONF_OK;

}
if (strcmp((char *)args[1].data, "close") == 0) {
if (strcmp((char *)args[1].data, "packet") == 0) {
log_template =
compile_template((char *)args[2].data, variables, cf->pool);
return NGX_CONF_OK;
} else if (strcmp((char *)args[1].data, "close") == 0) {
log_close_template =
compile_template((char *)args[2].data, variables, cf->pool);
return NGX_CONF_OK;
Expand Down Expand Up @@ -700,7 +732,7 @@ complete_ws_handshake(ngx_connection_t *connection, const char *ws_key)
Base64Encode(hash, SHA_DIGEST_LENGTH, access_key, ACCEPT_SIZE);
access_key[ACCEPT_SIZE] = '\0';
char resp[256];
sprintf(resp, resp_template, access_key);
snprintf(resp, 256, resp_template, access_key);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"Websocket connection closed");
connection->send(connection, (unsigned char *)resp, strlen(resp));
Expand Down Expand Up @@ -746,18 +778,18 @@ ngx_http_websocket_stat_init(ngx_conf_t *cf)
ngx_http_next_body_filter = ngx_http_top_body_filter;
ngx_http_top_body_filter = ngx_http_websocket_stat_body_filter;

if (!log_template) {
log_template =
compile_template(default_log_template_str, variables, cf->pool);
}
if (!log_open_template) {
log_open_template = compile_template(default_open_log_template_str,
variables, cf->pool);
}
if (!log_close_template) {
log_close_template = compile_template(default_close_log_template_str,
variables, cf->pool);
}
// if (!log_template) {
// log_template =
// compile_template(default_log_template_str, variables, cf->pool);
// }
// if (!log_open_template) {
// log_open_template = compile_template(default_open_log_template_str,
// variables, cf->pool);
// }
// if (!log_close_template) {
// log_close_template = compile_template(default_close_log_template_str,
// variables, cf->pool);
// }

ngx_http_handler_pt *h;
ngx_http_core_main_conf_t *cmcf;
Expand Down

0 comments on commit 0b0f120

Please sign in to comment.