Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Mar 3, 2024
1 parent 8fe411a commit ffa68a8
Showing 1 changed file with 54 additions and 47 deletions.
101 changes: 54 additions & 47 deletions src/ustreamer/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@
#endif


typedef struct {
pthread_t tid;
us_device_s *dev;
us_queue_s *queue;
pthread_mutex_t *mutex;
atomic_bool *stop;
} _releaser_context_s;

typedef struct {
pthread_t tid;
us_queue_s *queue;
Expand All @@ -64,20 +72,15 @@ typedef struct {
atomic_bool *stop;
} _h264_context_s;

typedef struct {
pthread_t tid;
us_device_s *dev;
us_queue_s *queue;
pthread_mutex_t *mutex;
atomic_bool *stop;
} _releaser_context_s;

static void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, bool online, uint captured_fps);

static void *_releaser_thread(void *v_ctx);
static void *_jpeg_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx);
static void *_releaser_thread(void *v_ctx);

static void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, bool online, uint captured_fps);
static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue);

static bool _stream_has_any_clients(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
Expand Down Expand Up @@ -272,6 +275,35 @@ void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, boo
atomic_store(&stream->run->http_capture_state, state);
}

static void *_releaser_thread(void *v_ctx) {
_releaser_context_s *ctx = v_ctx;

while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
continue;
}

while (atomic_load(&hw->refs) > 0) {
if (atomic_load(ctx->stop)) {
goto done;
}
usleep(5 * 1000);
}

US_MUTEX_LOCK(*ctx->mutex);
const int released = us_device_release_buffer(ctx->dev, hw);
US_MUTEX_UNLOCK(*ctx->mutex);
if (released < 0) {
goto done;
}
}

done:
atomic_store(ctx->stop, true); // Stop all other guys on error
return NULL;
}

static void *_jpeg_thread(void *v_ctx) {
_jpeg_context_s *ctx = v_ctx;
us_stream_s *stream = ctx->stream;
Expand Down Expand Up @@ -300,14 +332,10 @@ static void *_jpeg_thread(void *v_ctx) {
}
}

us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
us_hw_buffer_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
while (!us_queue_is_empty(ctx->queue)) { // Берем только самый свежий кадр
us_device_buffer_decref(hw);
assert(!us_queue_get(ctx->queue, (void**)&hw, 0));
}

if ( // Если никто не смотрит MJPEG - пропускаем кадр
!atomic_load(&stream->run->http_has_clients)
Expand Down Expand Up @@ -345,14 +373,10 @@ static void *_h264_thread(void *v_ctx) {

ldf last_encode_ts = us_get_now_monotonic();
while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
us_hw_buffer_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
while (!us_queue_is_empty(ctx->queue)) { // Берем только самый свежий кадр
us_device_buffer_decref(hw);
assert(!us_queue_get(ctx->queue, (void**)&hw, 0));
}

if (!us_memsink_server_check(ctx->h264->sink, NULL)) {
us_device_buffer_decref(hw);
Expand All @@ -371,33 +395,16 @@ static void *_h264_thread(void *v_ctx) {
return NULL;
}

static void *_releaser_thread(void *v_ctx) {
_releaser_context_s *ctx = v_ctx;

while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
continue;
}

while (atomic_load(&hw->refs) > 0) {
if (atomic_load(ctx->stop)) {
goto done;
}
usleep(5 * 1000);
}

US_MUTEX_LOCK(*ctx->mutex);
const int released = us_device_release_buffer(ctx->dev, hw);
US_MUTEX_UNLOCK(*ctx->mutex);
if (released < 0) {
goto done;
}
static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue) {
us_hw_buffer_s *hw;
if (us_queue_get(queue, (void**)&hw, 0.1) < 0) {
return NULL;
}

done:
atomic_store(ctx->stop, true); // Stop all other guys on error
return NULL;
while (!us_queue_is_empty(queue)) { // Берем только самый свежий кадр
us_device_buffer_decref(hw);
assert(!us_queue_get(queue, (void**)&hw, 0));
}
return hw;
}

static bool _stream_has_any_clients(us_stream_s *stream) {
Expand Down

0 comments on commit ffa68a8

Please sign in to comment.