Skip to content

Commit

Permalink
Issue #228: Request fresh snapshot from jpeg encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Mar 3, 2024
1 parent 8cb6fc4 commit c24d633
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 57 deletions.
142 changes: 85 additions & 57 deletions src/ustreamer/http/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx);

static void _http_refresher(int fd, short event, void *v_server);
static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
static void _http_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
static void _http_send_snapshot(us_server_s *server);

static bool _expose_frame(us_server_s *server, const us_frame_s *frame);

Expand All @@ -111,6 +112,9 @@ static char *_http_get_client_hostport(struct evhttp_request *request);
#define _A_EVBUFFER_ADD(x_buf, x_data, x_size) assert(!evbuffer_add(x_buf, x_data, x_size))
#define _A_EVBUFFER_ADD_PRINTF(x_buf, x_fmt, ...) assert(evbuffer_add_printf(x_buf, x_fmt, ##__VA_ARGS__) >= 0)

#define _A_ADD_HEADER(x_request, x_key, x_value) \
assert(!evhttp_add_header(evhttp_request_get_output_headers(x_request), x_key, x_value))


us_server_s *us_server_init(us_stream_s *stream) {
us_server_exposed_s *exposed;
Expand Down Expand Up @@ -159,6 +163,10 @@ void us_server_destroy(us_server_s *server) {
libevent_global_shutdown();
# endif

US_LIST_ITERATE(run->snapshot_clients, client, { // cppcheck-suppress constStatement
free(client);
});

US_LIST_ITERATE(run->stream_clients, client, { // cppcheck-suppress constStatement
free(client->key);
free(client->hostport);
Expand Down Expand Up @@ -265,8 +273,6 @@ void us_server_loop_break(us_server_s *server) {
event_base_loopbreak(server->run->base);
}

#define ADD_HEADER(x_key, x_value) assert(!evhttp_add_header(evhttp_request_get_output_headers(request), x_key, x_value))

static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) {
const us_server_runtime_s *const run = server->run;

Expand All @@ -276,13 +282,13 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s
const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers");
const char *const cors_method = _http_get_header(request, "Access-Control-Request-Method");

ADD_HEADER("Access-Control-Allow-Origin", server->allow_origin);
ADD_HEADER("Access-Control-Allow-Credentials", "true");
_A_ADD_HEADER(request, "Access-Control-Allow-Origin", server->allow_origin);
_A_ADD_HEADER(request, "Access-Control-Allow-Credentials", "true");
if (cors_headers != NULL) {
ADD_HEADER("Access-Control-Allow-Headers", cors_headers);
_A_ADD_HEADER(request, "Access-Control-Allow-Headers", cors_headers);
}
if (cors_method != NULL) {
ADD_HEADER("Access-Control-Allow-Methods", cors_method);
_A_ADD_HEADER(request, "Access-Control-Allow-Methods", cors_method);
}
}

Expand All @@ -295,7 +301,7 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s
const char *const token = _http_get_header(request, "Authorization");

if (token == NULL || strcmp(token, run->auth_token) != 0) {
ADD_HEADER("WWW-Authenticate", "Basic realm=\"Restricted area\"");
_A_ADD_HEADER(request, "WWW-Authenticate", "Basic realm=\"Restricted area\"");
evhttp_send_reply(request, 401, "Unauthorized", NULL);
return -1;
}
Expand Down Expand Up @@ -351,7 +357,7 @@ static void _http_callback_root(struct evhttp_request *request, void *v_server)
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD_PRINTF(buf, "%s", US_HTML_INDEX_PAGE);
ADD_HEADER("Content-Type", "text/html");
_A_ADD_HEADER(request, "Content-Type", "text/html");
evhttp_send_reply(request, HTTP_OK, "OK", buf);

evbuffer_free(buf);
Expand All @@ -365,7 +371,7 @@ static void _http_callback_favicon(struct evhttp_request *request, void *v_serve
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void*)US_FAVICON_ICO_DATA, US_FAVICON_ICO_DATA_SIZE);
ADD_HEADER("Content-Type", "image/x-icon");
_A_ADD_HEADER(request, "Content-Type", "image/x-icon");
evhttp_send_reply(request, HTTP_OK, "OK", buf);

evbuffer_free(buf);
Expand Down Expand Up @@ -423,7 +429,7 @@ static void _http_callback_static(struct evhttp_request *request, void *v_server
// and will close it when finished transferring data
fd = -1;

ADD_HEADER("Content-Type", us_guess_mime_type(static_path));
_A_ADD_HEADER(request, "Content-Type", us_guess_mime_type(static_path));
evhttp_send_reply(request, HTTP_OK, "OK", buf);
goto cleanup;
}
Expand Down Expand Up @@ -527,62 +533,26 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)

_A_EVBUFFER_ADD_PRINTF(buf, "}}}}");

ADD_HEADER("Content-Type", "application/json");
_A_ADD_HEADER(request, "Content-Type", "application/json");
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
}

static void _http_callback_snapshot(struct evhttp_request *request, void *v_server) {
us_server_s *const server = v_server;
us_server_exposed_s *const ex = server->run->exposed;

PREPROCESS_REQUEST;

struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used);

ADD_HEADER("Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0");
ADD_HEADER("Pragma", "no-cache");
ADD_HEADER("Expires", "Mon, 3 Jan 2000 12:34:56 GMT");

char header_buf[256];

# define ADD_TIME_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%.06Lf", x_value); \
ADD_HEADER(x_key, header_buf); \
}
us_snapshot_client_s *client;
US_CALLOC(client, 1);
client->server = server;
client->request = request;
client->request_ts = us_get_now_monotonic();

# define ADD_UNSIGNED_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%u", x_value); \
ADD_HEADER(x_key, header_buf); \
}

ADD_TIME_HEADER("X-Timestamp", us_get_now_real());

ADD_HEADER("X-UStreamer-Online", us_bool_to_string(ex->frame->online));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped);
ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width);
ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height);
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts);
ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic());

# undef ADD_UNSUGNED_HEADER
# undef ADD_TIME_HEADER

ADD_HEADER("Content-Type", "image/jpeg");

evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
atomic_fetch_add(&server->stream->run->http_snapshot_requested, 1);
US_LIST_APPEND(server->run->snapshot_clients, client);
}

#undef ADD_HEADER

static void _http_callback_stream(struct evhttp_request *request, void *v_server) {
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2814
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2789
Expand Down Expand Up @@ -826,7 +796,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha
free(client);
}

static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) {
static void _http_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) {
us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed;

Expand Down Expand Up @@ -881,6 +851,63 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo
}
}

static void _http_send_snapshot(us_server_s *server) {
us_server_exposed_s *const ex = server->run->exposed;

# define ADD_TIME_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%.06Lf", x_value); \
_A_ADD_HEADER(request, x_key, header_buf); \
}

# define ADD_UNSIGNED_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%u", x_value); \
_A_ADD_HEADER(request, x_key, header_buf); \
}

US_LIST_ITERATE(server->run->snapshot_clients, client, { // cppcheck-suppress constStatement
struct evhttp_request *request = client->request;
if (
(atomic_load(&server->stream->run->http_snapshot_requested) == 0) // Request complete
|| (client->request_ts + US_MAX((uint)1, server->timeout / 2) < us_get_now_monotonic())
) {
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used);

_A_ADD_HEADER(request, "Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0");
_A_ADD_HEADER(request, "Pragma", "no-cache");
_A_ADD_HEADER(request, "Expires", "Mon, 3 Jan 2000 12:34:56 GMT");

char header_buf[256];

ADD_TIME_HEADER("X-Timestamp", us_get_now_real());

_A_ADD_HEADER(request, "X-UStreamer-Online", us_bool_to_string(ex->frame->online));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped);
ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width);
ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height);
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts);
ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic());

_A_ADD_HEADER(request, "Content-Type", "image/jpeg");

evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);

US_LIST_REMOVE(server->run->snapshot_clients, client);
free(client);
}
});

# undef ADD_UNSUGNED_HEADER
# undef ADD_TIME_HEADER
}

static void _http_refresher(int fd, short what, void *v_server) {
(void)fd;
(void)what;
Expand All @@ -907,7 +934,8 @@ static void _http_refresher(int fd, short what, void *v_server) {
stream_updated = true;
}

_http_queue_send_stream(server, stream_updated, frame_updated);
_http_send_stream(server, stream_updated, frame_updated);
_http_send_snapshot(server);

if (
frame_updated
Expand Down
10 changes: 10 additions & 0 deletions src/ustreamer/http/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ typedef struct us_stream_client_sx {
US_LIST_STRUCT(struct us_stream_client_sx);
} us_stream_client_s;

typedef struct us_snapshot_client_sx {
struct us_server_sx *server;
struct evhttp_request *request;
ldf request_ts;

US_LIST_STRUCT(struct us_snapshot_client_sx);
} us_snapshot_client_s;

typedef struct {
us_frame_s *frame;
uint captured_fps;
Expand All @@ -83,6 +91,8 @@ typedef struct {

us_stream_client_s *stream_clients;
uint stream_clients_count;

us_snapshot_client_s *snapshot_clients;
} us_server_runtime_s;

typedef struct us_server_sx {
Expand Down
5 changes: 5 additions & 0 deletions src/ustreamer/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
US_CALLOC(run, 1);
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
atomic_init(&run->http_has_clients, false);
atomic_init(&run->http_snapshot_requested, 0);
atomic_init(&run->http_last_request_ts, 0);
atomic_init(&run->http_captured_fps, 0);
atomic_init(&run->stop, false);
Expand Down Expand Up @@ -267,6 +268,9 @@ static void *_jpeg_thread(void *v_ctx) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
if (atomic_load(&stream->run->http_snapshot_requested) > 0) { // Process real snapshots
atomic_fetch_sub(&stream->run->http_snapshot_requested, 1);
}
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
Expand Down Expand Up @@ -354,6 +358,7 @@ static bool _stream_has_any_clients(us_stream_s *stream) {
const us_stream_runtime_s *const run = stream->run;
return (
atomic_load(&run->http_has_clients)
|| (atomic_load(&run->http_snapshot_requested) > 0)
// has_clients синков НЕ обновляются в реальном времени
|| (stream->jpeg_sink != NULL && atomic_load(&stream->jpeg_sink->has_clients))
|| (run->h264 != NULL && /*run->h264->sink == NULL ||*/ atomic_load(&run->h264->sink->has_clients))
Expand Down
1 change: 1 addition & 0 deletions src/ustreamer/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ typedef struct {

us_ring_s *http_jpeg_ring;
atomic_bool http_has_clients;
atomic_uint http_snapshot_requested;
atomic_ullong http_last_request_ts; // Seconds
atomic_uint http_captured_fps;

Expand Down

0 comments on commit c24d633

Please sign in to comment.