Skip to content

Commit

Permalink
janus: reducing memory allocating using ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Feb 29, 2024
1 parent 7bacef7 commit 27be234
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 25 deletions.
16 changes: 6 additions & 10 deletions janus/src/memsinkfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "logging.h"


int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id) {
int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s *mem, u64 last_id) {
const ldf deadline_ts = us_get_now_monotonic() + 1; // wait_timeout
ldf now_ts;
do {
Expand All @@ -57,8 +57,7 @@ int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id) {
return -2;
}

us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame_id, bool key_required) {
us_frame_s *frame = us_frame_init();
int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required) {
us_frame_set_data(frame, mem->data, mem->used);
US_FRAME_COPY_META(mem, frame);
*frame_id = mem->id;
Expand All @@ -67,17 +66,14 @@ us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame
mem->key_requested = true;
}

bool ok = true;
bool retval = 0;
if (frame->format != V4L2_PIX_FMT_H264) {
US_JLOG_ERROR("video", "Got non-H264 frame from memsink");
ok = false;
retval = -1;
}
if (flock(fd, LOCK_UN) < 0) {
US_JLOG_PERROR("video", "Can't unlock memsink");
ok = false;
retval = -1;
}
if (!ok) {
US_DELETE(frame, us_frame_destroy);
}
return frame;
return retval;
}
4 changes: 2 additions & 2 deletions janus/src/memsinkfd.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
#include "uslibs/memsinksh.h"


int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id);
us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame_id, bool key_required);
int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s *mem, u64 last_id);
int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required);
40 changes: 27 additions & 13 deletions janus/src/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
#include "uslibs/tools.h"
#include "uslibs/threading.h"
#include "uslibs/list.h"
#include "uslibs/queue.h"
#include "uslibs/ring.h"
#include "uslibs/memsinksh.h"

#include "const.h"
Expand All @@ -60,7 +60,7 @@ static const useconds_t _g_watchers_polling = 100000;

static us_janus_client_s *_g_clients = NULL;
static janus_callbacks *_g_gw = NULL;
static us_queue_s *_g_video_queue = NULL;
static us_ring_s *_g_video_ring = NULL;
static us_rtpv_s *_g_rtpv = NULL;
static us_rtpa_s *_g_rtpa = NULL;

Expand Down Expand Up @@ -105,13 +105,14 @@ static void *_video_rtp_thread(void *arg) {
atomic_store(&_g_video_rtp_tid_created, true);

while (!_STOP) {
us_frame_s *frame;
if (us_queue_get(_g_video_queue, (void **)&frame, 0.1) == 0) {
const int ri = us_ring_consumer_acquire(_g_video_ring, 0.1);
if (ri >= 0) {
us_frame_s *frame = _g_video_ring->items[ri];
_LOCK_VIDEO;
const bool zero_playout_delay = (frame->gop == 0);
us_rtpv_wrap(_g_rtpv, frame, zero_playout_delay);
_UNLOCK_VIDEO;
us_frame_destroy(frame);
us_ring_consumer_release(_g_video_ring, ri);
}
}
return NULL;
Expand All @@ -123,6 +124,7 @@ static void *_video_sink_thread(void *arg) {
US_THREAD_RENAME("us_video_sink");
atomic_store(&_g_video_sink_tid_created, true);

us_frame_s *drop = us_frame_init();
u64 frame_id = 0;
int once = 0;

Expand Down Expand Up @@ -152,18 +154,29 @@ static void *_video_sink_thread(void *arg) {
while (!_STOP && _HAS_WATCHERS) {
const int result = us_memsink_fd_wait_frame(fd, mem, frame_id);
if (result == 0) {
us_frame_s *const frame = us_memsink_fd_get_frame(fd, mem, &frame_id, atomic_load(&_g_key_required));
if (frame == NULL) {
const int ri = us_ring_producer_acquire(_g_video_ring, 0);
us_frame_s *frame;
if (ri >= 0) {
frame = _g_video_ring->items[ri];
} else {
US_ONCE({ US_JLOG_PERROR("video", "Video ring is full"); });
frame = drop;
}

if (us_memsink_fd_get_frame(fd, mem, frame, &frame_id, atomic_load(&_g_key_required)) < 0) {
if (ri >= 0) {
us_ring_producer_release(_g_video_ring, ri);
}
goto close_memsink;
}
if (frame->key) {
atomic_store(&_g_key_required, false);
}
if (us_queue_put(_g_video_queue, frame, 0) != 0) {
US_ONCE({ US_JLOG_PERROR("video", "Video queue is full"); });
us_frame_destroy(frame);

if (ri >= 0) {
us_ring_producer_release(_g_video_ring, ri);
}
} else if (result == -1) {
} else if (result != -2) {
goto close_memsink;
}
}
Expand All @@ -174,6 +187,7 @@ static void *_video_sink_thread(void *arg) {
US_JLOG_INFO("video", "Memsink closed");
sleep(1); // error_delay
}
us_frame_destroy(drop);
return NULL;
}

Expand Down Expand Up @@ -258,7 +272,7 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
}
_g_gw = gw;

_g_video_queue = us_queue_init(1024);
US_RING_INIT_WITH_ITEMS(_g_video_ring, 64, us_frame_init);
_g_rtpv = us_rtpv_init(_relay_rtp_clients);
if (_g_config->audio_dev_name != NULL && us_audio_probe(_g_config->audio_dev_name)) {
_g_rtpa = us_rtpa_init(_relay_rtp_clients);
Expand Down Expand Up @@ -286,7 +300,7 @@ static void _plugin_destroy(void) {
us_janus_client_destroy(client);
});

US_QUEUE_DELETE_WITH_ITEMS(_g_video_queue, us_frame_destroy);
US_RING_DELETE_WITH_ITEMS(_g_video_ring, us_frame_destroy);

US_DELETE(_g_rtpa, us_rtpa_destroy);
US_DELETE(_g_rtpv, us_rtpv_destroy);
Expand Down
1 change: 1 addition & 0 deletions janus/src/uslibs/ring.c
1 change: 1 addition & 0 deletions janus/src/uslibs/ring.h
86 changes: 86 additions & 0 deletions src/libs/ring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2023 Maxim Devaev <[email protected]> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/


#include <assert.h>

#include "ring.h"

#include "types.h"
#include "tools.h"
#include "queue.h"


int _acquire(us_ring_s *ring, us_queue_s *queue, ldf timeout);
void _release(us_ring_s *ring, us_queue_s *queue, uint index);


us_ring_s *us_ring_init(uint capacity) {
us_ring_s *ring;
US_CALLOC(ring, 1);
US_CALLOC(ring->items, capacity);
US_CALLOC(ring->places, capacity);
ring->capacity = capacity;
ring->producer = us_queue_init(capacity);
ring->consumer = us_queue_init(capacity);
for (uint index = 0; index < capacity; ++index) {
ring->places[index] = index; // XXX: Just to avoid casting between pointer and uint
assert(!us_queue_put(ring->producer, (void*)(ring->places + index), 0));
}
return ring;
}

void us_ring_destroy(us_ring_s *ring) {
us_queue_destroy(ring->consumer);
us_queue_destroy(ring->producer);
free(ring->places);
free(ring->items);
free(ring);
}

int us_ring_producer_acquire(us_ring_s *ring, ldf timeout) {
return _acquire(ring, ring->producer, timeout);
}

void us_ring_producer_release(us_ring_s *ring, uint index) {
_release(ring, ring->consumer, index);
}

int us_ring_consumer_acquire(us_ring_s *ring, ldf timeout) {
return _acquire(ring, ring->consumer, timeout);
}

void us_ring_consumer_release(us_ring_s *ring, uint index) {
_release(ring, ring->producer, index);
}

int _acquire(us_ring_s *ring, us_queue_s *queue, ldf timeout) {
(void)ring;
uint *place;
if (us_queue_get(queue, (void**)&place, timeout) < 0) {
return -1;
}
return *place;
}

void _release(us_ring_s *ring, us_queue_s *queue, uint index) {
assert(!us_queue_put(queue, (void*)(ring->places + index), 0));
}
63 changes: 63 additions & 0 deletions src/libs/ring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2023 Maxim Devaev <[email protected]> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/


#pragma once


#include "types.h"
#include "queue.h"


typedef struct {
uz capacity;
void **items;
uint *places;
us_queue_s *producer;
us_queue_s *consumer;
} us_ring_s;


#define US_RING_INIT_WITH_ITEMS(x_ring, x_capacity, x_init_item) { \
(x_ring) = us_ring_init(x_capacity); \
for (uz m_index = 0; m_index < (x_ring)->capacity; ++m_index) { \
(x_ring)->items[m_index] = x_init_item(); \
} \
}

#define US_RING_DELETE_WITH_ITEMS(x_ring, x_destroy_item) { \
if (x_ring) { \
for (uz m_index = 0; m_index < (x_ring)->capacity; ++m_index) { \
x_destroy_item((x_ring)->items[m_index]); \
} \
us_ring_destroy(x_ring); \
} \
}


us_ring_s *us_ring_init(uint capacity);
void us_ring_destroy(us_ring_s *ring);

int us_ring_producer_acquire(us_ring_s *ring, ldf timeout);
void us_ring_producer_release(us_ring_s *ring, uint index);

int us_ring_consumer_acquire(us_ring_s *ring, ldf timeout);
void us_ring_consumer_release(us_ring_s *ring, uint index);

0 comments on commit 27be234

Please sign in to comment.