Skip to content

Commit

Permalink
fix(publisher): Decouple state management and improve test coverage (#…
Browse files Browse the repository at this point in the history
…1634)

### Overview

Decouples the participant state management from the WebRTC Publisher.
Improves the test coverage.
  • Loading branch information
oliverlaz authored Jan 2, 2025
1 parent 3cc3eca commit 6684612
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 353 deletions.
89 changes: 51 additions & 38 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
Publisher,
Subscriber,
toRtcConfiguration,
trackTypeToParticipantStreamKey,
} from './rtc';
import {
registerEventHandlers,
Expand Down Expand Up @@ -123,6 +124,7 @@ import {
import { getSdkSignature } from './stats/utils';
import { withoutConcurrency } from './helpers/concurrency';
import { ensureExhausted } from './helpers/ensureExhausted';
import { pushToIfMissing } from './helpers/array';
import {
makeSafePromise,
PromiseWithResolvers,
Expand Down Expand Up @@ -1504,15 +1506,9 @@ export class Call {
const [videoTrack] = videoStream.getVideoTracks();
if (!videoTrack) throw new Error('There is no video track in the stream');

if (!this.trackPublishOrder.includes(TrackType.VIDEO)) {
this.trackPublishOrder.push(TrackType.VIDEO);
}

await this.publisher.publishStream(
videoStream,
videoTrack,
TrackType.VIDEO,
);
pushToIfMissing(this.trackPublishOrder, TrackType.VIDEO);
await this.publisher.publish(videoTrack, TrackType.VIDEO);
await this.updateLocalStreamState(videoStream, TrackType.VIDEO);
};

/**
Expand All @@ -1538,14 +1534,9 @@ export class Call {
const [audioTrack] = audioStream.getAudioTracks();
if (!audioTrack) throw new Error('There is no audio track in the stream');

if (!this.trackPublishOrder.includes(TrackType.AUDIO)) {
this.trackPublishOrder.push(TrackType.AUDIO);
}
await this.publisher.publishStream(
audioStream,
audioTrack,
TrackType.AUDIO,
);
pushToIfMissing(this.trackPublishOrder, TrackType.AUDIO);
await this.publisher.publish(audioTrack, TrackType.AUDIO);
await this.updateLocalStreamState(audioStream, TrackType.AUDIO);
};

/**
Expand All @@ -1572,41 +1563,63 @@ export class Call {
throw new Error('There is no screen share track in the stream');
}

if (!this.trackPublishOrder.includes(TrackType.SCREEN_SHARE)) {
this.trackPublishOrder.push(TrackType.SCREEN_SHARE);
}
await this.publisher.publishStream(
screenShareStream,
screenShareTrack,
TrackType.SCREEN_SHARE,
);
pushToIfMissing(this.trackPublishOrder, TrackType.SCREEN_SHARE);
await this.publisher.publish(screenShareTrack, TrackType.SCREEN_SHARE);

const [screenShareAudioTrack] = screenShareStream.getAudioTracks();
if (screenShareAudioTrack) {
if (!this.trackPublishOrder.includes(TrackType.SCREEN_SHARE_AUDIO)) {
this.trackPublishOrder.push(TrackType.SCREEN_SHARE_AUDIO);
}
await this.publisher.publishStream(
screenShareStream,
pushToIfMissing(this.trackPublishOrder, TrackType.SCREEN_SHARE_AUDIO);
await this.publisher.publish(
screenShareAudioTrack,
TrackType.SCREEN_SHARE_AUDIO,
);
}
await this.updateLocalStreamState(
screenShareStream,
...(screenShareAudioTrack
? [TrackType.SCREEN_SHARE, TrackType.SCREEN_SHARE_AUDIO]
: [TrackType.SCREEN_SHARE]),
);
};

/**
* Stops publishing the given track type to the call, if it is currently being published.
* Underlying track will be stopped and removed from the publisher.
*
* @param trackType the track type to stop publishing.
* @param stopTrack if `true` the track will be stopped, else it will be just disabled
* @param trackTypes the track types to stop publishing.
*/
stopPublish = async (trackType: TrackType, stopTrack: boolean = true) => {
this.logger(
'info',
`stopPublish ${TrackType[trackType]}, stop tracks: ${stopTrack}`,
stopPublish = async (...trackTypes: TrackType[]) => {
if (!this.sfuClient || !this.publisher) return;
await this.updateLocalStreamState(undefined, ...trackTypes);
};

/**
* Updates the call state with the new stream.
*
* @param mediaStream the new stream to update the call state with.
* If undefined, the stream will be removed from the call state.
* @param trackTypes the track types to update the call state with.
*/
private updateLocalStreamState = async (
mediaStream: MediaStream | undefined,
...trackTypes: TrackType[]
) => {
if (!this.sfuClient || !this.sfuClient.sessionId) return;
await this.sfuClient.updateMuteStates(
trackTypes.map((trackType) => ({ trackType, muted: !mediaStream })),
);
await this.publisher?.unpublishStream(trackType, stopTrack);

const sessionId = this.sfuClient.sessionId;
for (const trackType of trackTypes) {
const streamStateProp = trackTypeToParticipantStreamKey(trackType);
if (!streamStateProp) continue;

this.state.updateParticipant(sessionId, (p) => ({
publishedTracks: mediaStream
? pushToIfMissing([...p.publishedTracks], trackType)
: p.publishedTracks.filter((t) => t !== trackType),
[streamStateProp]: mediaStream,
}));
}
};

/**
Expand Down
20 changes: 7 additions & 13 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ import {
SendAnswerRequest,
SendStatsRequest,
SetPublisherRequest,
TrackMuteState,
TrackSubscriptionDetails,
UpdateMuteStatesRequest,
} from './gen/video/sfu/signal_rpc/signal';
import { ICETrickle, TrackType } from './gen/video/sfu/models/models';
import { ICETrickle } from './gen/video/sfu/models/models';
import { StreamClient } from './coordinator/connection/client';
import { generateUUIDv4 } from './coordinator/connection/utils';
import { Credentials } from './gen/coordinator';
import { Logger } from './coordinator/connection/types';
import { getLogger, getLogLevel } from './logger';
import {
promiseWithResolvers,
PromiseWithResolvers,
makeSafePromise,
PromiseWithResolvers,
promiseWithResolvers,
SafePromise,
} from './helpers/promise';
import { getTimers } from './timers';
Expand Down Expand Up @@ -340,17 +340,11 @@ export class StreamSfuClient {
);
};

updateMuteState = async (trackType: TrackType, muted: boolean) => {
await this.joinTask;
return this.updateMuteStates({ muteStates: [{ trackType, muted }] });
};

updateMuteStates = async (
data: Omit<UpdateMuteStatesRequest, 'sessionId'>,
) => {
updateMuteStates = async (muteStates: TrackMuteState[]) => {
await this.joinTask;
return retryable(
() => this.rpc.updateMuteStates({ ...data, sessionId: this.sessionId }),
() =>
this.rpc.updateMuteStates({ muteStates, sessionId: this.sessionId }),
this.abortController.signal,
);
};
Expand Down
Loading

0 comments on commit 6684612

Please sign in to comment.