Skip to content

Commit

Permalink
janus: using ring for audio pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Feb 29, 2024
1 parent 12937b9 commit 4296d5d
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 51 deletions.
117 changes: 69 additions & 48 deletions janus/src/audio.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ typedef struct {
} _enc_buffer_s;


static _pcm_buffer_s *_pcm_buffer_init(void);
static _enc_buffer_s *_enc_buffer_init(void);

static void *_pcm_thread(void *v_audio);
static void *_encoder_thread(void *v_audio);

Expand All @@ -72,8 +75,8 @@ us_audio_s *us_audio_init(const char *name, unsigned pcm_hz) {
us_audio_s *audio;
US_CALLOC(audio, 1);
audio->pcm_hz = pcm_hz;
audio->pcm_queue = us_queue_init(8);
audio->enc_queue = us_queue_init(8);
US_RING_INIT_WITH_ITEMS(audio->pcm_ring, 8, _pcm_buffer_init);
US_RING_INIT_WITH_ITEMS(audio->enc_ring, 8, _enc_buffer_init);
atomic_init(&audio->stop, false);

int err;
Expand Down Expand Up @@ -151,8 +154,8 @@ void us_audio_destroy(us_audio_s *audio) {
US_DELETE(audio->res, speex_resampler_destroy);
US_DELETE(audio->pcm, snd_pcm_close);
US_DELETE(audio->pcm_params, snd_pcm_hw_params_free);
US_QUEUE_DELETE_WITH_ITEMS(audio->enc_queue, free);
US_QUEUE_DELETE_WITH_ITEMS(audio->pcm_queue, free);
US_RING_DELETE_WITH_ITEMS(audio->enc_ring, free);
US_RING_DELETE_WITH_ITEMS(audio->pcm_ring, free);
if (audio->tids_created) {
US_JLOG_INFO("audio", "Pipeline closed");
}
Expand All @@ -163,19 +166,32 @@ int us_audio_get_encoded(us_audio_s *audio, uint8_t *data, size_t *size, uint64_
if (atomic_load(&audio->stop)) {
return -1;
}
_enc_buffer_s *buf;
if (!us_queue_get(audio->enc_queue, (void **)&buf, 0.1)) {
if (*size < buf->used) {
free(buf);
return -3;
}
memcpy(data, buf->data, buf->used);
*size = buf->used;
*pts = buf->pts;
free(buf);
return 0;
const int ri = us_ring_consumer_acquire(audio->enc_ring, 0.1);
if (ri < 0) {
return -2;
}
return -2;
_enc_buffer_s *const buf = audio->enc_ring->items[ri];
if (*size < buf->used) {
us_ring_consumer_release(audio->enc_ring, ri);
return -3;
}
memcpy(data, buf->data, buf->used);
*size = buf->used;
*pts = buf->pts;
us_ring_consumer_release(audio->enc_ring, ri);
return 0;
}

static _pcm_buffer_s *_pcm_buffer_init(void) {
_pcm_buffer_s *buf;
US_CALLOC(buf, 1);
return buf;
}

static _enc_buffer_s *_enc_buffer_init(void) {
_enc_buffer_s *buf;
US_CALLOC(buf, 1);
return buf;
}

static void *_pcm_thread(void *v_audio) {
Expand All @@ -194,13 +210,13 @@ static void *_pcm_thread(void *v_audio) {
break;
}

if (us_queue_get_free(audio->pcm_queue)) {
_pcm_buffer_s *out;
US_CALLOC(out, 1);
const int ri = us_ring_producer_acquire(audio->pcm_ring, 0);
if (ri >= 0) {
_pcm_buffer_s *const out = audio->pcm_ring->items[ri];
memcpy(out->data, in, audio->pcm_size);
assert(!us_queue_put(audio->pcm_queue, out, 0));
us_ring_producer_release(audio->pcm_ring, ri);
} else {
US_JLOG_ERROR("audio", "PCM queue is full");
US_JLOG_ERROR("audio", "PCM ring is full");
}
}

Expand All @@ -215,39 +231,44 @@ static void *_encoder_thread(void *v_audio) {
int16_t in_res[_MAX_BUF16];

while (!atomic_load(&audio->stop)) {
_pcm_buffer_s *in;
if (!us_queue_get(audio->pcm_queue, (void **)&in, 0.1)) {
int16_t *in_ptr;
if (audio->res != NULL) {
assert(audio->pcm_hz != _ENCODER_INPUT_HZ);
uint32_t in_count = audio->pcm_frames;
uint32_t out_count = _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
speex_resampler_process_interleaved_int(audio->res, in->data, &in_count, in_res, &out_count);
in_ptr = in_res;
} else {
assert(audio->pcm_hz == _ENCODER_INPUT_HZ);
in_ptr = in->data;
}
const int in_ri = us_ring_consumer_acquire(audio->pcm_ring, 0.1);
if (in_ri < 0) {
continue;
}
_pcm_buffer_s *const in = audio->pcm_ring->items[in_ri];

int16_t *in_ptr;
if (audio->res != NULL) {
assert(audio->pcm_hz != _ENCODER_INPUT_HZ);
uint32_t in_count = audio->pcm_frames;
uint32_t out_count = _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
speex_resampler_process_interleaved_int(audio->res, in->data, &in_count, in_res, &out_count);
in_ptr = in_res;
} else {
assert(audio->pcm_hz == _ENCODER_INPUT_HZ);
in_ptr = in->data;
}

_enc_buffer_s *out;
US_CALLOC(out, 1);
const int size = opus_encode(audio->enc, in_ptr, _HZ_TO_FRAMES(_ENCODER_INPUT_HZ), out->data, US_ARRAY_LEN(out->data));
free(in);
if (size < 0) {
_JLOG_PERROR_OPUS(size, "audio", "Fatal: Can't encode PCM frame to OPUS");
free(out);
break;
}
const int out_ri = us_ring_producer_acquire(audio->enc_ring, 0);
if (out_ri < 0) {
US_JLOG_ERROR("audio", "OPUS encoder queue is full");
us_ring_consumer_release(audio->pcm_ring, in_ri);
continue;
}
_enc_buffer_s *const out = audio->enc_ring->items[out_ri];

const int size = opus_encode(audio->enc, in_ptr, _HZ_TO_FRAMES(_ENCODER_INPUT_HZ), out->data, US_ARRAY_LEN(out->data));
us_ring_consumer_release(audio->pcm_ring, in_ri);

if (size >= 0) {
out->used = size;
out->pts = audio->pts;
// https://datatracker.ietf.org/doc/html/rfc7587#section-4.2
audio->pts += _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);

if (us_queue_put(audio->enc_queue, out, 0) != 0) {
US_JLOG_ERROR("audio", "OPUS encoder queue is full");
free(out);
}
} else {
_JLOG_PERROR_OPUS(size, "audio", "Fatal: Can't encode PCM frame to OPUS");
}
us_ring_producer_release(audio->enc_ring, out_ri);
}

atomic_store(&audio->stop, true);
Expand Down
6 changes: 3 additions & 3 deletions janus/src/audio.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

#include "uslibs/tools.h"
#include "uslibs/array.h"
#include "uslibs/queue.h"
#include "uslibs/ring.h"
#include "uslibs/threading.h"

#include "logging.h"
Expand All @@ -52,8 +52,8 @@ typedef struct {
SpeexResamplerState *res;
OpusEncoder *enc;

us_queue_s *pcm_queue;
us_queue_s *enc_queue;
us_ring_s *pcm_ring;
us_ring_s *enc_ring;
uint32_t pts;

pthread_t pcm_tid;
Expand Down

0 comments on commit 4296d5d

Please sign in to comment.