Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Feb 29, 2024
1 parent d9401b7 commit a3f4294
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 63 deletions.
129 changes: 68 additions & 61 deletions src/ustreamer/http/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@ static char *_http_get_client_hostport(struct evhttp_request *request);
#define _A_EVBUFFER_ADD_PRINTF(x_buf, x_fmt, ...) assert(evbuffer_add_printf(x_buf, x_fmt, ##__VA_ARGS__) >= 0)

#define _VID(x_next) server->run->stream->run->video->x_next
#define _EX(x_next) server->run->exposed->x_next


us_server_s *us_server_init(us_stream_s *stream) {
us_exposed_s *exposed;
us_server_exposed_s *exposed;
US_CALLOC(exposed, 1);
exposed->frame = us_frame_init();

Expand Down Expand Up @@ -123,6 +122,7 @@ void us_server_destroy(us_server_s *server) {

int us_server_listen(us_server_s *server) {
us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed;
us_stream_s *const stream = run->stream;

{
Expand All @@ -138,9 +138,9 @@ int us_server_listen(us_server_s *server) {
assert(!evhttp_set_cb(run->http, "/stream", _http_callback_stream, (void *)server));
}

us_frame_copy(stream->blank, _EX(frame));
_EX(notify_last_width) = _EX(frame->width);
_EX(notify_last_height) = _EX(frame->height);
us_frame_copy(stream->blank, ex->frame);
ex->notify_last_width = ex->frame->width;
ex->notify_last_height = ex->frame->height;

if (server->exit_on_no_clients > 0) {
run->last_request_ts = us_get_now_monotonic();
Expand Down Expand Up @@ -404,6 +404,7 @@ static void _http_callback_static(struct evhttp_request *request, void *v_server
static void _http_callback_state(struct evhttp_request *request, void *v_server) {
us_server_s *const server = (us_server_s *)v_server;
us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed;
us_stream_s *const stream = run->stream;

PREPROCESS_REQUEST;
Expand Down Expand Up @@ -455,12 +456,12 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
" \"source\": {\"resolution\": {\"width\": %u, \"height\": %u},"
" \"online\": %s, \"desired_fps\": %u, \"captured_fps\": %u},"
" \"stream\": {\"queued_fps\": %u, \"clients\": %u, \"clients_stat\": {",
(server->fake_width ? server->fake_width : _EX(frame->width)),
(server->fake_height ? server->fake_height : _EX(frame->height)),
us_bool_to_string(_EX(frame->online)),
(server->fake_width ? server->fake_width : ex->frame->width),
(server->fake_height ? server->fake_height : ex->frame->height),
us_bool_to_string(ex->frame->online),
stream->dev->desired_fps,
_EX(captured_fps),
_EX(queued_fps),
ex->captured_fps,
ex->queued_fps,
run->stream_clients_count
);

Expand Down Expand Up @@ -488,12 +489,13 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)

static void _http_callback_snapshot(struct evhttp_request *request, void *v_server) {
us_server_s *const server = (us_server_s *)v_server;
us_server_exposed_s *const ex = server->run->exposed;

PREPROCESS_REQUEST;

struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void *)_EX(frame->data), _EX(frame->used));
_A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used);

ADD_HEADER("Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0");
ADD_HEADER("Pragma", "no-cache");
Expand All @@ -513,16 +515,16 @@ static void _http_callback_snapshot(struct evhttp_request *request, void *v_serv

ADD_TIME_HEADER("X-Timestamp", us_get_now_real());

ADD_HEADER("X-UStreamer-Online", us_bool_to_string(_EX(frame->online)));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", _EX(dropped));
ADD_UNSIGNED_HEADER("X-UStreamer-Width", _EX(frame->width));
ADD_UNSIGNED_HEADER("X-UStreamer-Height", _EX(frame->height));
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", _EX(frame->grab_ts));
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", _EX(frame->encode_begin_ts));
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", _EX(frame->encode_end_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", _EX(expose_begin_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", _EX(expose_cmp_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", _EX(expose_end_ts));
ADD_HEADER("X-UStreamer-Online", us_bool_to_string(ex->frame->online));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped);
ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width);
ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height);
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts);
ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic());

# undef ADD_UNSUGNED_HEADER
Expand Down Expand Up @@ -605,6 +607,7 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_client) {
us_stream_client_s *const client = (us_stream_client_s *)v_client;
us_server_s *const server = client->server;
us_server_exposed_s *const ex = server->run->exposed;

const long double now = us_get_now_monotonic();
const long long now_second = us_floor_ms(now);
Expand Down Expand Up @@ -692,7 +695,7 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
"Content-Length: %zu" RN
"X-Timestamp: %.06Lf" RN
"%s",
(!client->zero_data ? _EX(frame->used) : 0),
(!client->zero_data ? ex->frame->used : 0),
us_get_now_real(),
(client->extra_headers ? "" : RN)
);
Expand All @@ -712,25 +715,25 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
"X-UStreamer-Send-Time: %.06Lf" RN
"X-UStreamer-Latency: %.06Lf" RN
RN,
us_bool_to_string(_EX(frame->online)),
_EX(dropped),
_EX(frame->width),
_EX(frame->height),
us_bool_to_string(ex->frame->online),
ex->dropped,
ex->frame->width,
ex->frame->height,
client->fps,
_EX(frame->grab_ts),
_EX(frame->encode_begin_ts),
_EX(frame->encode_end_ts),
_EX(expose_begin_ts),
_EX(expose_cmp_ts),
_EX(expose_end_ts),
ex->frame->grab_ts,
ex->frame->encode_begin_ts,
ex->frame->encode_end_ts,
ex->expose_begin_ts,
ex->expose_cmp_ts,
ex->expose_end_ts,
now,
now - _EX(frame->grab_ts)
now - ex->frame->grab_ts
);
}
}

if (!client->zero_data) {
_A_EVBUFFER_ADD(buf, (void *)_EX(frame->data), _EX(frame->used));
_A_EVBUFFER_ADD(buf, (void*)ex->frame->data, ex->frame->used);
}
_A_EVBUFFER_ADD_PRINTF(buf, RN "--" BOUNDARY RN);

Expand Down Expand Up @@ -780,6 +783,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha

static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) {
us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed;

bool has_clients = false;
bool queued = false;
Expand Down Expand Up @@ -822,13 +826,13 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo
static long long queued_fps_second = 0;
const long long now = us_floor_ms(us_get_now_monotonic());
if (now != queued_fps_second) {
_EX(queued_fps) = queued_fps_accum;
ex->queued_fps = queued_fps_accum;
queued_fps_accum = 0;
queued_fps_second = now;
}
queued_fps_accum += 1;
} else if (!has_clients) {
_EX(queued_fps) = 0;
ex->queued_fps = 0;
}
}

Expand All @@ -855,17 +859,18 @@ static void _http_refresher(int fd, short what, void *v_server) {
(void)what;

us_server_s *server = (us_server_s *)v_server;
us_server_exposed_s *ex = server->run->exposed;
bool stream_updated = false;
bool frame_updated = false;

if (atomic_load(&_VID(updated))) {
frame_updated = _expose_new_frame(server);
stream_updated = true;
} else if (_EX(expose_end_ts) + 1 < us_get_now_monotonic()) {
} else if (ex->expose_end_ts + 1 < us_get_now_monotonic()) {
US_LOG_DEBUG("HTTP: Repeating exposed ...");
_EX(expose_begin_ts) = us_get_now_monotonic();
_EX(expose_cmp_ts) = _EX(expose_begin_ts);
_EX(expose_end_ts) = _EX(expose_begin_ts);
ex->expose_begin_ts = us_get_now_monotonic();
ex->expose_cmp_ts = ex->expose_begin_ts;
ex->expose_end_ts = ex->expose_begin_ts;
frame_updated = true;
stream_updated = true;
}
Expand All @@ -876,56 +881,58 @@ static void _http_refresher(int fd, short what, void *v_server) {
frame_updated
&& server->notify_parent
&& (
_EX(notify_last_online) != _EX(frame->online)
|| _EX(notify_last_width) != _EX(frame->width)
|| _EX(notify_last_height) != _EX(frame->height)
ex->notify_last_online != ex->frame->online
|| ex->notify_last_width != ex->frame->width
|| ex->notify_last_height != ex->frame->height
)
) {
_EX(notify_last_online) = _EX(frame->online);
_EX(notify_last_width) = _EX(frame->width);
_EX(notify_last_height) = _EX(frame->height);
ex->notify_last_online = ex->frame->online;
ex->notify_last_width = ex->frame->width;
ex->notify_last_height = ex->frame->height;
us_process_notify_parent();
}
}

static bool _expose_new_frame(us_server_s *server) {
us_server_exposed_s *const ex = server->run->exposed;

bool updated = false;

US_MUTEX_LOCK(_VID(mutex));

US_LOG_DEBUG("HTTP: Updating exposed frame (online=%d) ...", _VID(frame->online));

_EX(captured_fps) = _VID(captured_fps);
_EX(expose_begin_ts) = us_get_now_monotonic();
ex->captured_fps = _VID(captured_fps);
ex->expose_begin_ts = us_get_now_monotonic();

if (server->drop_same_frames && _VID(frame->online)) {
bool need_drop = false;
bool maybe_same = false;
if (
(need_drop = (_EX(dropped) < server->drop_same_frames))
&& (maybe_same = us_frame_compare(_EX(frame), _VID(frame)))
(need_drop = (ex->dropped < server->drop_same_frames))
&& (maybe_same = us_frame_compare(ex->frame, _VID(frame)))
) {
_EX(expose_cmp_ts) = us_get_now_monotonic();
_EX(expose_end_ts) = _EX(expose_cmp_ts);
ex->expose_cmp_ts = us_get_now_monotonic();
ex->expose_end_ts = ex->expose_cmp_ts;
US_LOG_VERBOSE("HTTP: Dropped same frame number %u; cmp_time=%.06Lf",
_EX(dropped), _EX(expose_cmp_ts) - _EX(expose_begin_ts));
_EX(dropped) += 1;
ex->dropped, (ex->expose_cmp_ts - ex->expose_begin_ts));
ex->dropped += 1;
goto not_updated;
} else {
_EX(expose_cmp_ts) = us_get_now_monotonic();
ex->expose_cmp_ts = us_get_now_monotonic();
US_LOG_VERBOSE("HTTP: Passed same frame check (need_drop=%d, maybe_same=%d); cmp_time=%.06Lf",
need_drop, maybe_same, (_EX(expose_cmp_ts) - _EX(expose_begin_ts)));
need_drop, maybe_same, (ex->expose_cmp_ts - ex->expose_begin_ts));
}
}

us_frame_copy(_VID(frame), _EX(frame));
us_frame_copy(_VID(frame), ex->frame);

_EX(dropped) = 0;
_EX(expose_cmp_ts) = _EX(expose_begin_ts);
_EX(expose_end_ts) = us_get_now_monotonic();
ex->dropped = 0;
ex->expose_cmp_ts = ex->expose_begin_ts;
ex->expose_end_ts = us_get_now_monotonic();

US_LOG_VERBOSE("HTTP: Exposed frame: online=%d, exp_time=%.06Lf",
_EX(frame->online), _EX(expose_end_ts) - _EX(expose_begin_ts));
ex->frame->online, (ex->expose_end_ts - ex->expose_begin_ts));

updated = true;

Expand Down
4 changes: 2 additions & 2 deletions src/ustreamer/http/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ typedef struct {
bool notify_last_online;
unsigned notify_last_width;
unsigned notify_last_height;
} us_exposed_s;
} us_server_exposed_s;

typedef struct {
struct event_base *base;
Expand All @@ -125,7 +125,7 @@ typedef struct {

struct event *refresher;
us_stream_s *stream;
us_exposed_s *exposed;
us_server_exposed_s *exposed;

us_stream_client_s *stream_clients;
unsigned stream_clients_count;
Expand Down

0 comments on commit a3f4294

Please sign in to comment.