Skip to content

Commit

Permalink
pipewire: Revise plugin code. Closes: #1401
Browse files Browse the repository at this point in the history
- Use RingBuf from audcore
- Respect output_buffer_size setting
- Improve get_delay() and drain() functions
- Add support for 5 and 7 channels

Co-authored-by: Thomas Lange <[email protected]>
  • Loading branch information
maris-ab and radioactiveman committed Dec 1, 2024
1 parent d306f1a commit 185ddd3
Showing 1 changed file with 59 additions and 29 deletions.
88 changes: 59 additions & 29 deletions src/pipewire/pipewire.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,19 @@

#include <libaudcore/i18n.h>
#include <libaudcore/plugin.h>
#include <libaudcore/ringbuf.h>
#include <libaudcore/runtime.h>

#if !PW_CHECK_VERSION(0, 3, 33)
#define PW_KEY_NODE_RATE "node.rate"
#define PW_KEY_NODE_RATE "node.rate"
#endif

#if !PW_CHECK_VERSION(0, 3, 50)
static inline int pw_stream_get_time_n(struct pw_stream * stream,
struct pw_time * time, size_t size)
{
return pw_stream_get_time(stream, time);
}
#endif

class PipeWireOutput : public OutputPlugin
Expand Down Expand Up @@ -103,9 +112,8 @@ class PipeWireOutput : public OutputPlugin
int m_aud_format = 0;
int m_core_init_seq = 0;

unsigned char * m_buffer = nullptr;
unsigned int m_buffer_at = 0;
unsigned int m_buffer_size = 0;
RingBuf<unsigned char> m_buffer;
unsigned int m_pw_buffer_size = 0;
unsigned int m_frames = 0;
unsigned int m_stride = 0;
unsigned int m_rate = 0;
Expand Down Expand Up @@ -164,31 +172,57 @@ void PipeWireOutput::pause(bool pause)

int PipeWireOutput::get_delay()
{
return (m_buffer_at / m_stride + m_frames) * 1000 / m_rate;
int buff_time = ((m_buffer.len() / m_stride) * 1000) / m_rate;
int pw_buff_time = ((m_pw_buffer_size / m_stride) * 1000) / m_rate;
int time_diff = 0;
int add_delay = 0;

// Get time difference from updated time snapshot of the stream
struct pw_time time;
if (pw_stream_get_time_n(m_stream, &time, sizeof time) == 0)
{
time_diff = (pw_stream_get_nsec(m_stream) - time.now) / SPA_NSEC_PER_MSEC;
time_diff = aud::clamp(time_diff, 0, pw_buff_time);
add_delay = (time.buffered + time.queued) * 1000 / m_rate;

if (time.rate.denom > 0)
add_delay += time.delay * 1000 * time.rate.num / time.rate.denom;
}

return buff_time + pw_buff_time - time_diff + add_delay;
}

void PipeWireOutput::drain()
{
pw_thread_loop_lock(m_loop);
if (m_buffer_at > 0)
pw_thread_loop_timed_wait(m_loop, 2);

int buflen;
while ((buflen = m_buffer.len()) > 0)
{
pw_thread_loop_timed_wait(m_loop, 1);
if (buflen <= m_buffer.len())
{
AUDERR("PipeWireOutput: buffer drain lock\n");
break;
}
}

pw_stream_flush(m_stream, true);
pw_thread_loop_timed_wait(m_loop, 2);
pw_thread_loop_timed_wait(m_loop, 1); // trigger on_drained() callback
pw_thread_loop_unlock(m_loop);
}

void PipeWireOutput::flush()
{
pw_thread_loop_lock(m_loop);
m_buffer_at = 0;
m_buffer.discard();
pw_thread_loop_unlock(m_loop);
pw_stream_flush(m_stream, false);
}

void PipeWireOutput::period_wait()
{
if (m_buffer_at != m_buffer_size)
if (m_buffer.space())
return;

pw_thread_loop_lock(m_loop);
Expand All @@ -200,12 +234,11 @@ int PipeWireOutput::write_audio(const void * data, int length)
{
pw_thread_loop_lock(m_loop);

auto size = aud::min<size_t>(m_buffer_size - m_buffer_at, length);
memcpy(m_buffer + m_buffer_at, data, size);
m_buffer_at += size;
length = aud::min(length, m_buffer.space());
m_buffer.copy_in(static_cast<const unsigned char *>(data), length);

pw_thread_loop_unlock(m_loop);
return size;
return length;
}

void PipeWireOutput::close_audio()
Expand Down Expand Up @@ -248,11 +281,7 @@ void PipeWireOutput::close_audio()
m_loop = nullptr;
}

if (m_buffer)
{
delete[] m_buffer;
m_buffer = nullptr;
}
m_buffer.destroy();
}

bool PipeWireOutput::open_audio(int format, int rate, int channels, String & error)
Expand Down Expand Up @@ -343,10 +372,9 @@ bool PipeWireOutput::init_core()
return false;
}

m_frames = aud_get_int("output_buffer_size") * m_rate / 1000;
m_stride = FMT_SIZEOF(m_aud_format) * m_channels;
m_frames = aud::clamp<int>(64, ceilf(2048 * m_rate / 48000.0f), 8192);
m_buffer_size = m_frames * m_stride;
m_buffer = new unsigned char[m_buffer_size];
m_buffer.alloc(m_frames * m_stride);

return true;
}
Expand Down Expand Up @@ -490,7 +518,7 @@ void PipeWireOutput::on_process(void * data)
struct spa_buffer * buf;
void * dst;

if (!o->m_buffer_at)
if (!o->m_buffer.len())
{
pw_thread_loop_signal(o->m_loop, false);
return;
Expand All @@ -510,13 +538,12 @@ void PipeWireOutput::on_process(void * data)
return;
}

auto size = aud::min<uint32_t>(buf->datas[0].maxsize, o->m_buffer_at);
memcpy(dst, o->m_buffer, size);
o->m_buffer_at -= size;
memmove(o->m_buffer, o->m_buffer + size, o->m_buffer_at);
auto size = aud::min<uint32_t>(buf->datas[0].maxsize, o->m_buffer.len());
o->m_pw_buffer_size = size;
o->m_buffer.move_out(static_cast<unsigned char *>(dst), size);

b->buffer->datas[0].chunk->offset = 0;
b->buffer->datas[0].chunk->size = o->m_buffer_size;
b->buffer->datas[0].chunk->size = size;
b->buffer->datas[0].chunk->stride = o->m_stride;

pw_stream_queue_buffer(o->m_stream, b);
Expand Down Expand Up @@ -570,15 +597,18 @@ void PipeWireOutput::set_channel_map(struct spa_audio_info_raw * info, int chann
info->position[8] = SPA_AUDIO_CHANNEL_RC;
// Fall through
case 8:
case 7:
info->position[6] = SPA_AUDIO_CHANNEL_FLC;
info->position[7] = SPA_AUDIO_CHANNEL_FRC;
// Fall through
case 6:
case 5:
info->position[4] = SPA_AUDIO_CHANNEL_RL;
info->position[5] = SPA_AUDIO_CHANNEL_RR;
// Fall through
case 4:
info->position[3] = SPA_AUDIO_CHANNEL_LFE;
if (channels != 5 && channels != 7)
info->position[3] = SPA_AUDIO_CHANNEL_LFE;
// Fall through
case 3:
info->position[2] = SPA_AUDIO_CHANNEL_FC;
Expand Down

0 comments on commit 185ddd3

Please sign in to comment.