Skip to content

Commit

Permalink
⚡️ Improve webaudio sink sync by copying logic from localdevice sink
Browse files Browse the repository at this point in the history
  • Loading branch information
geekuillaume committed Oct 27, 2020
1 parent 70a7b8f commit f072df9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CircularTypedArray } from '../../../utils/circularTypedArray';
import { OPUS_ENCODER_RATE, OPUS_ENCODER_CHUNK_SAMPLES_COUNT, MAX_LATENCY } from '../../../utils/constants';
import { SynchronizedAudioBuffer } from '../../../utils/audio/synchronizedAudioBuffer';
import { OPUS_ENCODER_RATE, MAX_LATENCY } from '../../../utils/constants';

const CHANNELS = 2;
const BUFFER_SIZE = MAX_LATENCY * (OPUS_ENCODER_RATE / 1000) * CHANNELS;
Expand All @@ -14,11 +13,7 @@ declare const sampleRate: number;
// @ts-ignore
class RawPcmPlayerProcessor extends AudioWorkletProcessor {
chunkBuffer = new Float32Array(128 * CHANNELS);
currentSampleIndex = -1;
buffer = new CircularTypedArray(Float32Array, BUFFER_SIZE);
synchronizedBuffer: SynchronizedAudioBuffer;
lastReceivedStreamTime = -1;
audioClockDrift = -1;

port: MessagePort;

Expand All @@ -28,27 +23,18 @@ class RawPcmPlayerProcessor extends AudioWorkletProcessor {
}

handleMessage_(event) {
if (event.data.type === 'init') {
this.synchronizedBuffer = new SynchronizedAudioBuffer(this.buffer, CHANNELS, this.getIdealAudioPosition, { debug: event.data.debug });
}
if (event.data.type === 'chunk') {
const offset = event.data.i * OPUS_ENCODER_CHUNK_SAMPLES_COUNT * CHANNELS;
this.buffer.set(event.data.chunk, offset);
this.audioClockDrift = event.data.audioClockDrift;
this.buffer.set(event.data.chunk, event.data.offset);
console.log(`received chunk at diff: ${(event.data.offset / 2) - currentFrame}`);
}
}

getIdealAudioPosition = () => currentFrame - this.audioClockDrift

process(inputs, outputs) {
if (!this.synchronizedBuffer || this.audioClockDrift === -1) {
return true;
}
const chunkBuffer = this.synchronizedBuffer.readNextChunk(outputs[0][0].length);
this.buffer.getInTypedArray(this.chunkBuffer, currentFrame * CHANNELS, this.chunkBuffer.length);

for (let sampleIndex = 0; sampleIndex < outputs[0][0].length; sampleIndex++) {
outputs[0][0][sampleIndex] = chunkBuffer[sampleIndex * CHANNELS];
outputs[0][1][sampleIndex] = chunkBuffer[sampleIndex * CHANNELS + 1];
outputs[0][0][sampleIndex] = this.chunkBuffer[sampleIndex * CHANNELS];
outputs[0][1][sampleIndex] = this.chunkBuffer[sampleIndex * CHANNELS + 1];
}

return true;
Expand Down
77 changes: 40 additions & 37 deletions src/audio/sinks/webaudio_sink.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// This is only used in a browser context

import debug from '../../utils/environment/log';
import { AudioChunkStreamOutput } from '../../utils/audio/chunk_stream';
import { isBrowser } from '../../utils/environment/isBrowser';
import { AudioSink } from './audio_sink';
import { AudioSource } from '../sources/audio_source';
import { WebAudioSinkDescriptor } from './sink_type';
import { AudioSourcesSinksManager } from '../audio_sources_sinks_manager';
import { AudioInstance } from '../utils';
import { OPUS_ENCODER_RATE } from '../../utils/constants';
import { OPUS_ENCODER_CHUNK_SAMPLES_COUNT, OPUS_ENCODER_RATE } from '../../utils/constants';
import { NumericStatsTracker } from '../../utils/basicNumericStatsTracker';
import { DriftAwareAudioBufferTransformer } from '../../utils/audio/synchronizedAudioBuffer';

const AUDIO_DRIFT_HISTORY_INTERVAL = 50;
const AUDIO_DRIFT_HISTORY_DURATION = 2 * 60 * 1000;
Expand All @@ -19,10 +19,11 @@ export const isWebAudioAvailable = () => typeof AudioContext === 'function' && t
export class WebAudioSink extends AudioSink {
type: 'webaudio' = 'webaudio';
local: true = true;
workletNode: AudioWorkletNode;
context: AudioContext;
cleanAudioContext;
audioClockDriftHistory = new NumericStatsTracker<number>((v) => v, AUDIO_DRIFT_HISTORY_DURATION / AUDIO_DRIFT_HISTORY_INTERVAL);
private workletNode: AudioWorkletNode;
private context: AudioContext;
private cleanAudioContext: () => void;
private audioClockDriftHistory = new NumericStatsTracker<number>((v) => v, AUDIO_DRIFT_HISTORY_DURATION / AUDIO_DRIFT_HISTORY_INTERVAL);
private audioBufferTransformer: DriftAwareAudioBufferTransformer;

constructor(descriptor: WebAudioSinkDescriptor, manager: AudioSourcesSinksManager) {
super(descriptor, manager);
Expand All @@ -38,18 +39,6 @@ export class WebAudioSink extends AudioSink {
throw new Error('Webaudio sink already started');
}

// we cannot put this class in the global file scope as it will be created by the nodejs process
// which will throw an error because AudioWorkletNode only exists browser side
class RawPcmPlayerWorklet extends AudioWorkletNode {
constructor(context) {
super(context, 'rawPcmPlayerProcessor', {
numberOfOutputs: 1,
numberOfInputs: 0,
outputChannelCount: [source.channels],
});
}
}

if (!this.context) {
this.context = new AudioContext({
sampleRate: OPUS_ENCODER_RATE,
Expand All @@ -59,25 +48,18 @@ export class WebAudioSink extends AudioSink {
latencyHint: 0.01,
});
}
// eslint-disable-next-line
const audioworkletPath = require('./audioworklets/webaudio_sink_processor.audioworklet.ts');
await this.context.audioWorklet.addModule(audioworkletPath);
this.workletNode = new RawPcmPlayerWorklet(this.context);
this.workletNode.port.postMessage({
type: 'init',
debug: debug.enabled('soundsync:audioSinkDebug'),
});
const volumeNode = this.context.createGain();
volumeNode.gain.value = this.volume;
this.workletNode.connect(volumeNode);
volumeNode.connect(this.context.destination);

this.context.resume();

const syncDeviceVolume = () => {
volumeNode.gain.value = this.volume;
};
this.on('update', syncDeviceVolume);

this.audioBufferTransformer = new DriftAwareAudioBufferTransformer(
this.channels,
() => Math.floor(this.audioClockDriftHistory.mean()),
);

const driftRegisterInterval = setInterval(this.registerDrift, AUDIO_DRIFT_HISTORY_INTERVAL);
// this should be set before any await to make sure we have the clean method available if _stopSink is called between _startSink ends
this.cleanAudioContext = () => {
Expand All @@ -87,9 +69,30 @@ export class WebAudioSink extends AudioSink {
this.context.suspend();
delete this.context;
this.cleanAudioContext = undefined;
this.audioClockDriftHistory.flush();
clearInterval(driftRegisterInterval);
};

// we cannot put this class in the global file scope as it will be created by the nodejs process
// which will throw an error because AudioWorkletNode only exists browser side
class RawPcmPlayerWorklet extends AudioWorkletNode {
constructor(context) {
super(context, 'rawPcmPlayerProcessor', {
numberOfOutputs: 1,
numberOfInputs: 0,
outputChannelCount: [source.channels],
});
}
}
// eslint-disable-next-line
const audioworkletPath = require('./audioworklets/webaudio_sink_processor.audioworklet.ts');
await this.context.audioWorklet.addModule(audioworkletPath);
this.workletNode = new RawPcmPlayerWorklet(this.context);
this.workletNode.connect(volumeNode);
volumeNode.connect(this.context.destination);

this.context.resume();

// The context can be blocked from starting because of new webaudio changes
// we need to wait for a user input to start it
if (this.context.state === 'suspended') {
Expand Down Expand Up @@ -120,12 +123,12 @@ export class WebAudioSink extends AudioSink {
return;
}
const chunk = new Float32Array(data.chunk.buffer, data.chunk.byteOffset, data.chunk.byteLength / Float32Array.BYTES_PER_ELEMENT);
const { bufferTimestamp, buffer } = this.audioBufferTransformer.transformChunk(chunk, (data.i * OPUS_ENCODER_CHUNK_SAMPLES_COUNT));
this.workletNode.port.postMessage({
type: 'chunk',
i: data.i,
chunk,
audioClockDrift: this.audioClockDriftHistory.full(1) ? this.audioClockDriftHistory.mean() : -1,
}, [chunk.buffer]); // we transfer the chunk.buffer to the audio worklet to prevent a memory copy
chunk: buffer,
offset: bufferTimestamp * this.channels,
}, [buffer.buffer]); // we transfer the buffer.buffer to the audio worklet to prevent a memory copy
}

registerDrift = () => {
Expand All @@ -134,12 +137,12 @@ export class WebAudioSink extends AudioSink {
// audio context is not started yet, could be because it's waiting for a user interaction to start
return;
}
const audioClockDrift = ((
const audioClockDrift = ((contextTime * 1000) - (
this.pipedSource.peer.getCurrentTime(true)
- this.pipedSource.startedAt
- this.pipedSource.latency
+ this.latencyCorrection
) - (contextTime * 1000)) * (this.rate / 1000);
)) * (this.rate / 1000);
if (!Number.isNaN(audioClockDrift)) {
this.audioClockDriftHistory.push(audioClockDrift);
}
Expand Down
81 changes: 0 additions & 81 deletions src/utils/audio/synchronizedAudioBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// It will hard or soft sync depending on the clock drift between the audio device and the ideal time

import { l } from '../environment/log';
import { CircularTypedArray } from '../circularTypedArray';
import { HARD_SYNC_MIN_AUDIO_DRIFT, SOFT_SYNC_MIN_AUDIO_DRIFT, OPUS_ENCODER_RATE } from '../constants';

const DRIFT_CORRECTION_MIN_INTERVAL = OPUS_ENCODER_RATE * 5; // force minimum 5 seconds between each latency correction
Expand Down Expand Up @@ -114,83 +113,3 @@ export class DriftAwareAudioBufferTransformer {
return { bufferTimestamp, buffer };
}
}

export class SynchronizedAudioBuffer {
private returnBuffer = Buffer.alloc(128 * Float32Array.BYTES_PER_ELEMENT * 2); // start with a reasonably large buffer that will be resized if necessary
private typedReturnBuffer = new Float32Array(this.returnBuffer.buffer);
private delayedDriftCorrection = 0;
// number of samples to wait until starting to correct for drift again, used to prevent over correction
private ignoreDriftFor = 0;
public softSyncThreshold: number;
public hardSyncThreshold: number;

constructor(
public buffer: CircularTypedArray<Float32Array>,
public channels: number,
public idealPositionPerChannelGetter: () => number,
{
debug = false,
softSyncThreshold = SOFT_SYNC_MIN_AUDIO_DRIFT,
hardSyncThreshold = HARD_SYNC_MIN_AUDIO_DRIFT,
} = {},
) {
// eslint-disable-next-line no-console
this.log = debug ? console.log : () => null;
this.softSyncThreshold = softSyncThreshold;
this.hardSyncThreshold = hardSyncThreshold;
}

log: (str: string) => void;

readNextChunk(samplesPerChannel: number) {
const idealBufferPosition = this.idealPositionPerChannelGetter() * this.channels;
let sampleDelta = 0;
if (this.buffer.getReaderPointer() === 0) {
this.buffer.setReaderPointer(idealBufferPosition);
}
// this.log(`= ideal position ${idealBufferPosition}, current position ${this.buffer.getReaderPointer()}, diff ${idealBufferPosition - this.buffer.getReaderPointer()}`);
if (this.delayedDriftCorrection) {
sampleDelta = Math.floor(Math.min(samplesPerChannel * 0.02, Math.abs(this.delayedDriftCorrection) * 0.1)) * Math.sign(this.delayedDriftCorrection); // max 1% sample to remove or duplicate, or 10% of drift
this.delayedDriftCorrection -= sampleDelta;
if (sampleDelta === 0) {
this.log(`= finished delayed soft drift correction`);
this.delayedDriftCorrection = 0;
this.ignoreDriftFor = DRIFT_CORRECTION_MIN_INTERVAL;
}
} else if (!this.ignoreDriftFor) {
const drift = Math.floor(idealBufferPosition - this.buffer.getReaderPointer());
const driftDuration = drift / (OPUS_ENCODER_RATE / 1000);
if (Math.abs(driftDuration) > this.hardSyncThreshold) {
// the drift is too important, this can happens in case the CPU was locked for a while (after suspending the device for example)
// this will induce a audible glitch
this.buffer.setReaderPointer(idealBufferPosition);
this.ignoreDriftFor = DRIFT_CORRECTION_MIN_INTERVAL;
this.log(`= hard sync: ${driftDuration}ms`);
} else if (Math.abs(driftDuration) > this.softSyncThreshold) {
// we should be correcting for the drift but it's small enough that we can do this only by adding
// or removing some samples in the output buffer
// if drift is > 0, it means the audio device is going too fast
// so we need to slow down the rate at which we read from the audio buffer to go back to the correct time
sampleDelta = Math.floor(Math.min(samplesPerChannel * 0.02, Math.abs(drift) * 0.1)) * Math.sign(drift); // max 1% sample to remove or duplicate, or 10% of drift
this.ignoreDriftFor = DRIFT_CORRECTION_MIN_INTERVAL;
this.delayedDriftCorrection = Math.floor((drift - sampleDelta) * 0.4);
this.log(`= soft sync: ${driftDuration}ms (${drift} samples), injecting ${sampleDelta} samples now`);
}
}
if (this.ignoreDriftFor) {
this.ignoreDriftFor = Math.max(0, this.ignoreDriftFor - samplesPerChannel);
}
const samplesToReadByChannel = samplesPerChannel + sampleDelta;
if (this.returnBuffer.byteLength !== samplesToReadByChannel * this.channels * Float32Array.BYTES_PER_ELEMENT) {
this.returnBuffer = Buffer.alloc(samplesToReadByChannel * this.channels * Float32Array.BYTES_PER_ELEMENT);
this.typedReturnBuffer = new Float32Array(this.returnBuffer.buffer);
}
this.buffer.getAtReaderPointerInTypedArray(this.typedReturnBuffer, samplesToReadByChannel * this.channels);
const buffer = smartResizeAudioBuffer(
this.typedReturnBuffer,
samplesPerChannel,
this.channels,
);
return buffer;
}
}
5 changes: 4 additions & 1 deletion src/utils/circularTypedArray.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ export class CircularTypedArray<T extends TypedArray> {
}

set(data: T, offset: number) {
const realOffset = offset % this.buffer.length;
let realOffset = offset % this.buffer.length;
if (realOffset < 0) {
realOffset += this.buffer.length;
}
const overflow = Math.max(0, (realOffset + data.length) - this.buffer.length);
if (!overflow) {
this.buffer.set(data, realOffset);
Expand Down

0 comments on commit f072df9

Please sign in to comment.