diff --git a/src/ustreamer/http/server.c b/src/ustreamer/http/server.c index 85e1a3387..2944fe4bb 100644 --- a/src/ustreamer/http/server.c +++ b/src/ustreamer/http/server.c @@ -93,7 +93,8 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx); static void _http_refresher(int fd, short event, void *v_server); -static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated); +static void _http_send_stream(us_server_s *server, bool stream_updated, bool frame_updated); +static void _http_send_snapshot(us_server_s *server); static bool _expose_frame(us_server_s *server, const us_frame_s *frame); @@ -111,6 +112,9 @@ static char *_http_get_client_hostport(struct evhttp_request *request); #define _A_EVBUFFER_ADD(x_buf, x_data, x_size) assert(!evbuffer_add(x_buf, x_data, x_size)) #define _A_EVBUFFER_ADD_PRINTF(x_buf, x_fmt, ...) assert(evbuffer_add_printf(x_buf, x_fmt, ##__VA_ARGS__) >= 0) +#define _A_ADD_HEADER(x_request, x_key, x_value) \ + assert(!evhttp_add_header(evhttp_request_get_output_headers(x_request), x_key, x_value)) + us_server_s *us_server_init(us_stream_s *stream) { us_server_exposed_s *exposed; @@ -159,6 +163,10 @@ void us_server_destroy(us_server_s *server) { libevent_global_shutdown(); # endif + US_LIST_ITERATE(run->snapshot_clients, client, { // cppcheck-suppress constStatement + free(client); + }); + US_LIST_ITERATE(run->stream_clients, client, { // cppcheck-suppress constStatement free(client->key); free(client->hostport); @@ -265,8 +273,6 @@ void us_server_loop_break(us_server_s *server) { event_base_loopbreak(server->run->base); } -#define ADD_HEADER(x_key, x_value) assert(!evhttp_add_header(evhttp_request_get_output_headers(request), x_key, x_value)) - static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) { const us_server_runtime_s *const run = server->run; @@ -276,13 +282,13 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers"); const char *const cors_method = _http_get_header(request, "Access-Control-Request-Method"); - ADD_HEADER("Access-Control-Allow-Origin", server->allow_origin); - ADD_HEADER("Access-Control-Allow-Credentials", "true"); + _A_ADD_HEADER(request, "Access-Control-Allow-Origin", server->allow_origin); + _A_ADD_HEADER(request, "Access-Control-Allow-Credentials", "true"); if (cors_headers != NULL) { - ADD_HEADER("Access-Control-Allow-Headers", cors_headers); + _A_ADD_HEADER(request, "Access-Control-Allow-Headers", cors_headers); } if (cors_method != NULL) { - ADD_HEADER("Access-Control-Allow-Methods", cors_method); + _A_ADD_HEADER(request, "Access-Control-Allow-Methods", cors_method); } } @@ -295,7 +301,7 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s const char *const token = _http_get_header(request, "Authorization"); if (token == NULL || strcmp(token, run->auth_token) != 0) { - ADD_HEADER("WWW-Authenticate", "Basic realm=\"Restricted area\""); + _A_ADD_HEADER(request, "WWW-Authenticate", "Basic realm=\"Restricted area\""); evhttp_send_reply(request, 401, "Unauthorized", NULL); return -1; } @@ -351,7 +357,7 @@ static void _http_callback_root(struct evhttp_request *request, void *v_server) struct evbuffer *buf; _A_EVBUFFER_NEW(buf); _A_EVBUFFER_ADD_PRINTF(buf, "%s", US_HTML_INDEX_PAGE); - ADD_HEADER("Content-Type", "text/html"); + _A_ADD_HEADER(request, "Content-Type", "text/html"); evhttp_send_reply(request, HTTP_OK, "OK", buf); evbuffer_free(buf); @@ -365,7 +371,7 @@ static void _http_callback_favicon(struct evhttp_request *request, void *v_serve struct evbuffer *buf; _A_EVBUFFER_NEW(buf); _A_EVBUFFER_ADD(buf, (const void*)US_FAVICON_ICO_DATA, US_FAVICON_ICO_DATA_SIZE); - ADD_HEADER("Content-Type", "image/x-icon"); + _A_ADD_HEADER(request, "Content-Type", "image/x-icon"); evhttp_send_reply(request, HTTP_OK, "OK", buf); evbuffer_free(buf); @@ -423,7 +429,7 @@ static void _http_callback_static(struct evhttp_request *request, void *v_server // and will close it when finished transferring data fd = -1; - ADD_HEADER("Content-Type", us_guess_mime_type(static_path)); + _A_ADD_HEADER(request, "Content-Type", us_guess_mime_type(static_path)); evhttp_send_reply(request, HTTP_OK, "OK", buf); goto cleanup; } @@ -527,62 +533,26 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server) _A_EVBUFFER_ADD_PRINTF(buf, "}}}}"); - ADD_HEADER("Content-Type", "application/json"); + _A_ADD_HEADER(request, "Content-Type", "application/json"); evhttp_send_reply(request, HTTP_OK, "OK", buf); evbuffer_free(buf); } static void _http_callback_snapshot(struct evhttp_request *request, void *v_server) { us_server_s *const server = v_server; - us_server_exposed_s *const ex = server->run->exposed; PREPROCESS_REQUEST; - struct evbuffer *buf; - _A_EVBUFFER_NEW(buf); - _A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used); - - ADD_HEADER("Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0"); - ADD_HEADER("Pragma", "no-cache"); - ADD_HEADER("Expires", "Mon, 3 Jan 2000 12:34:56 GMT"); - - char header_buf[256]; - -# define ADD_TIME_HEADER(x_key, x_value) { \ - US_SNPRINTF(header_buf, 255, "%.06Lf", x_value); \ - ADD_HEADER(x_key, header_buf); \ - } + us_snapshot_client_s *client; + US_CALLOC(client, 1); + client->server = server; + client->request = request; + client->request_ts = us_get_now_monotonic(); -# define ADD_UNSIGNED_HEADER(x_key, x_value) { \ - US_SNPRINTF(header_buf, 255, "%u", x_value); \ - ADD_HEADER(x_key, header_buf); \ - } - - ADD_TIME_HEADER("X-Timestamp", us_get_now_real()); - - ADD_HEADER("X-UStreamer-Online", us_bool_to_string(ex->frame->online)); - ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped); - ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width); - ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height); - ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts); - ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts); - ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts); - ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts); - ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts); - ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts); - ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic()); - -# undef ADD_UNSUGNED_HEADER -# undef ADD_TIME_HEADER - - ADD_HEADER("Content-Type", "image/jpeg"); - - evhttp_send_reply(request, HTTP_OK, "OK", buf); - evbuffer_free(buf); + atomic_fetch_add(&server->stream->run->http_snapshot_requested, 1); + US_LIST_APPEND(server->run->snapshot_clients, client); } -#undef ADD_HEADER - static void _http_callback_stream(struct evhttp_request *request, void *v_server) { // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2814 // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2789 @@ -826,7 +796,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha free(client); } -static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) { +static void _http_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) { us_server_runtime_s *const run = server->run; us_server_exposed_s *const ex = run->exposed; @@ -881,6 +851,63 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo } } +static void _http_send_snapshot(us_server_s *server) { + us_server_exposed_s *const ex = server->run->exposed; + +# define ADD_TIME_HEADER(x_key, x_value) { \ + US_SNPRINTF(header_buf, 255, "%.06Lf", x_value); \ + _A_ADD_HEADER(request, x_key, header_buf); \ + } + +# define ADD_UNSIGNED_HEADER(x_key, x_value) { \ + US_SNPRINTF(header_buf, 255, "%u", x_value); \ + _A_ADD_HEADER(request, x_key, header_buf); \ + } + + US_LIST_ITERATE(server->run->snapshot_clients, client, { // cppcheck-suppress constStatement + struct evhttp_request *request = client->request; + if ( + (atomic_load(&server->stream->run->http_snapshot_requested) == 0) // Request complete + || (client->request_ts + US_MAX((uint)1, server->timeout / 2) < us_get_now_monotonic()) + ) { + struct evbuffer *buf; + _A_EVBUFFER_NEW(buf); + _A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used); + + _A_ADD_HEADER(request, "Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0"); + _A_ADD_HEADER(request, "Pragma", "no-cache"); + _A_ADD_HEADER(request, "Expires", "Mon, 3 Jan 2000 12:34:56 GMT"); + + char header_buf[256]; + + ADD_TIME_HEADER("X-Timestamp", us_get_now_real()); + + _A_ADD_HEADER(request, "X-UStreamer-Online", us_bool_to_string(ex->frame->online)); + ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped); + ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width); + ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height); + ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts); + ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts); + ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts); + ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts); + ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts); + ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts); + ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic()); + + _A_ADD_HEADER(request, "Content-Type", "image/jpeg"); + + evhttp_send_reply(request, HTTP_OK, "OK", buf); + evbuffer_free(buf); + + US_LIST_REMOVE(server->run->snapshot_clients, client); + free(client); + } + }); + +# undef ADD_UNSUGNED_HEADER +# undef ADD_TIME_HEADER +} + static void _http_refresher(int fd, short what, void *v_server) { (void)fd; (void)what; @@ -907,7 +934,8 @@ static void _http_refresher(int fd, short what, void *v_server) { stream_updated = true; } - _http_queue_send_stream(server, stream_updated, frame_updated); + _http_send_stream(server, stream_updated, frame_updated); + _http_send_snapshot(server); if ( frame_updated diff --git a/src/ustreamer/http/server.h b/src/ustreamer/http/server.h index c7ad4cb3c..9facba7e5 100644 --- a/src/ustreamer/http/server.h +++ b/src/ustreamer/http/server.h @@ -57,6 +57,14 @@ typedef struct us_stream_client_sx { US_LIST_STRUCT(struct us_stream_client_sx); } us_stream_client_s; +typedef struct us_snapshot_client_sx { + struct us_server_sx *server; + struct evhttp_request *request; + ldf request_ts; + + US_LIST_STRUCT(struct us_snapshot_client_sx); +} us_snapshot_client_s; + typedef struct { us_frame_s *frame; uint captured_fps; @@ -83,6 +91,8 @@ typedef struct { us_stream_client_s *stream_clients; uint stream_clients_count; + + us_snapshot_client_s *snapshot_clients; } us_server_runtime_s; typedef struct us_server_sx { diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 83398a7ca..1fec8669d 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -95,6 +95,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) { US_CALLOC(run, 1); US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init); atomic_init(&run->http_has_clients, false); + atomic_init(&run->http_snapshot_requested, 0); atomic_init(&run->http_last_request_ts, 0); atomic_init(&run->http_captured_fps, 0); atomic_init(&run->stop, false); @@ -267,6 +268,9 @@ static void *_jpeg_thread(void *v_ctx) { // pass } else if (ready_wr->job_timely) { _stream_expose_frame(stream, ready_job->dest); + if (atomic_load(&stream->run->http_snapshot_requested) > 0) { // Process real snapshots + atomic_fetch_sub(&stream->run->http_snapshot_requested, 1); + } US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf", ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts); } else { @@ -354,6 +358,7 @@ static bool _stream_has_any_clients(us_stream_s *stream) { const us_stream_runtime_s *const run = stream->run; return ( atomic_load(&run->http_has_clients) + || (atomic_load(&run->http_snapshot_requested) > 0) // has_clients синков НЕ обновляются в реальном времени || (stream->jpeg_sink != NULL && atomic_load(&stream->jpeg_sink->has_clients)) || (run->h264 != NULL && /*run->h264->sink == NULL ||*/ atomic_load(&run->h264->sink->has_clients)) diff --git a/src/ustreamer/stream.h b/src/ustreamer/stream.h index 1ae3eb8bb..6b9e3b746 100644 --- a/src/ustreamer/stream.h +++ b/src/ustreamer/stream.h @@ -42,6 +42,7 @@ typedef struct { us_ring_s *http_jpeg_ring; atomic_bool http_has_clients; + atomic_uint http_snapshot_requested; atomic_ullong http_last_request_ts; // Seconds atomic_uint http_captured_fps;