diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index c974eaa20e..ef9c4ee3ec 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -7,6 +7,7 @@ import { Publisher, Subscriber, toRtcConfiguration, + trackTypeToParticipantStreamKey, } from './rtc'; import { registerEventHandlers, @@ -123,6 +124,7 @@ import { import { getSdkSignature } from './stats/utils'; import { withoutConcurrency } from './helpers/concurrency'; import { ensureExhausted } from './helpers/ensureExhausted'; +import { pushToIfMissing } from './helpers/array'; import { makeSafePromise, PromiseWithResolvers, @@ -1504,15 +1506,9 @@ export class Call { const [videoTrack] = videoStream.getVideoTracks(); if (!videoTrack) throw new Error('There is no video track in the stream'); - if (!this.trackPublishOrder.includes(TrackType.VIDEO)) { - this.trackPublishOrder.push(TrackType.VIDEO); - } - - await this.publisher.publishStream( - videoStream, - videoTrack, - TrackType.VIDEO, - ); + pushToIfMissing(this.trackPublishOrder, TrackType.VIDEO); + await this.publisher.publish(videoTrack, TrackType.VIDEO); + await this.updateLocalStreamState(videoStream, TrackType.VIDEO); }; /** @@ -1538,14 +1534,9 @@ export class Call { const [audioTrack] = audioStream.getAudioTracks(); if (!audioTrack) throw new Error('There is no audio track in the stream'); - if (!this.trackPublishOrder.includes(TrackType.AUDIO)) { - this.trackPublishOrder.push(TrackType.AUDIO); - } - await this.publisher.publishStream( - audioStream, - audioTrack, - TrackType.AUDIO, - ); + pushToIfMissing(this.trackPublishOrder, TrackType.AUDIO); + await this.publisher.publish(audioTrack, TrackType.AUDIO); + await this.updateLocalStreamState(audioStream, TrackType.AUDIO); }; /** @@ -1572,41 +1563,63 @@ export class Call { throw new Error('There is no screen share track in the stream'); } - if (!this.trackPublishOrder.includes(TrackType.SCREEN_SHARE)) { - this.trackPublishOrder.push(TrackType.SCREEN_SHARE); - } - await this.publisher.publishStream( - screenShareStream, - screenShareTrack, - TrackType.SCREEN_SHARE, - ); + pushToIfMissing(this.trackPublishOrder, TrackType.SCREEN_SHARE); + await this.publisher.publish(screenShareTrack, TrackType.SCREEN_SHARE); const [screenShareAudioTrack] = screenShareStream.getAudioTracks(); if (screenShareAudioTrack) { - if (!this.trackPublishOrder.includes(TrackType.SCREEN_SHARE_AUDIO)) { - this.trackPublishOrder.push(TrackType.SCREEN_SHARE_AUDIO); - } - await this.publisher.publishStream( - screenShareStream, + pushToIfMissing(this.trackPublishOrder, TrackType.SCREEN_SHARE_AUDIO); + await this.publisher.publish( screenShareAudioTrack, TrackType.SCREEN_SHARE_AUDIO, ); } + await this.updateLocalStreamState( + screenShareStream, + ...(screenShareAudioTrack + ? [TrackType.SCREEN_SHARE, TrackType.SCREEN_SHARE_AUDIO] + : [TrackType.SCREEN_SHARE]), + ); }; /** * Stops publishing the given track type to the call, if it is currently being published. - * Underlying track will be stopped and removed from the publisher. * - * @param trackType the track type to stop publishing. - * @param stopTrack if `true` the track will be stopped, else it will be just disabled + * @param trackTypes the track types to stop publishing. */ - stopPublish = async (trackType: TrackType, stopTrack: boolean = true) => { - this.logger( - 'info', - `stopPublish ${TrackType[trackType]}, stop tracks: ${stopTrack}`, + stopPublish = async (...trackTypes: TrackType[]) => { + if (!this.sfuClient || !this.publisher) return; + await this.updateLocalStreamState(undefined, ...trackTypes); + }; + + /** + * Updates the call state with the new stream. + * + * @param mediaStream the new stream to update the call state with. + * If undefined, the stream will be removed from the call state. + * @param trackTypes the track types to update the call state with. + */ + private updateLocalStreamState = async ( + mediaStream: MediaStream | undefined, + ...trackTypes: TrackType[] + ) => { + if (!this.sfuClient || !this.sfuClient.sessionId) return; + await this.sfuClient.updateMuteStates( + trackTypes.map((trackType) => ({ trackType, muted: !mediaStream })), ); - await this.publisher?.unpublishStream(trackType, stopTrack); + + const sessionId = this.sfuClient.sessionId; + for (const trackType of trackTypes) { + const streamStateProp = trackTypeToParticipantStreamKey(trackType); + if (!streamStateProp) continue; + + this.state.updateParticipant(sessionId, (p) => ({ + publishedTracks: mediaStream + ? pushToIfMissing([...p.publishedTracks], trackType) + : p.publishedTracks.filter((t) => t !== trackType), + [streamStateProp]: mediaStream, + })); + } }; /** diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index c58e47dd50..2b50d8e042 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -20,19 +20,19 @@ import { SendAnswerRequest, SendStatsRequest, SetPublisherRequest, + TrackMuteState, TrackSubscriptionDetails, - UpdateMuteStatesRequest, } from './gen/video/sfu/signal_rpc/signal'; -import { ICETrickle, TrackType } from './gen/video/sfu/models/models'; +import { ICETrickle } from './gen/video/sfu/models/models'; import { StreamClient } from './coordinator/connection/client'; import { generateUUIDv4 } from './coordinator/connection/utils'; import { Credentials } from './gen/coordinator'; import { Logger } from './coordinator/connection/types'; import { getLogger, getLogLevel } from './logger'; import { - promiseWithResolvers, - PromiseWithResolvers, makeSafePromise, + PromiseWithResolvers, + promiseWithResolvers, SafePromise, } from './helpers/promise'; import { getTimers } from './timers'; @@ -340,17 +340,11 @@ export class StreamSfuClient { ); }; - updateMuteState = async (trackType: TrackType, muted: boolean) => { - await this.joinTask; - return this.updateMuteStates({ muteStates: [{ trackType, muted }] }); - }; - - updateMuteStates = async ( - data: Omit, - ) => { + updateMuteStates = async (muteStates: TrackMuteState[]) => { await this.joinTask; return retryable( - () => this.rpc.updateMuteStates({ ...data, sessionId: this.sessionId }), + () => + this.rpc.updateMuteStates({ muteStates, sessionId: this.sessionId }), this.abortController.signal, ); }; diff --git a/packages/client/src/__tests__/Call.publishing.test.ts b/packages/client/src/__tests__/Call.publishing.test.ts new file mode 100644 index 0000000000..0015c2b483 --- /dev/null +++ b/packages/client/src/__tests__/Call.publishing.test.ts @@ -0,0 +1,264 @@ +import '../rtc/__tests__/mocks/webrtc.mocks'; + +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { Call } from '../Call'; +import { Publisher } from '../rtc'; +import { StreamClient } from '../coordinator/connection/client'; +import { generateUUIDv4 } from '../coordinator/connection/utils'; +import { PermissionsContext } from '../permissions'; +import { OwnCapability } from '../gen/coordinator'; +import { StreamVideoWriteableStateStore } from '../store'; +import { TrackType } from '../gen/video/sfu/models/models'; +import { StreamSfuClient } from '../StreamSfuClient'; + +describe('Publishing and Unpublishing tracks', () => { + let call: Call; + + beforeEach(async () => { + call = new Call({ + type: 'test', + id: generateUUIDv4(), + streamClient: new StreamClient('abc'), + clientStore: new StreamVideoWriteableStateStore(), + }); + + const ctx = new PermissionsContext(); + ctx.setPermissions([ + OwnCapability.SEND_AUDIO, + OwnCapability.SEND_VIDEO, + OwnCapability.SCREENSHARE, + ]); + // @ts-expect-error permissionsContext is private + call['permissionsContext'] = ctx; + }); + + describe('Validations', () => { + it('publishing is not allowed only when call is not joined', async () => { + const ms = new MediaStream(); + const err = 'Call not joined yet.'; + await expect(call.publishVideoStream(ms)).rejects.toThrowError(err); + await expect(call.publishAudioStream(ms)).rejects.toThrowError(err); + await expect(call.publishScreenShareStream(ms)).rejects.toThrowError(err); + }); + + it('publishing is not allowed when permissions are not set', async () => { + // @ts-expect-error sfuClient is private + call['sfuClient'] = {}; + + call['permissionsContext'].setPermissions([]); + + const ms = new MediaStream(); + await expect(call.publishVideoStream(ms)).rejects.toThrowError( + `No permission to publish video`, + ); + await expect(call.publishAudioStream(ms)).rejects.toThrowError( + 'No permission to publish audio', + ); + await expect(call.publishScreenShareStream(ms)).rejects.toThrowError( + 'No permission to publish screen share', + ); + }); + + it('publishing is not allowed when the publisher is not initialized', async () => { + // @ts-expect-error sfuClient is private + call['sfuClient'] = {}; + + const ms = new MediaStream(); + await expect(call.publishVideoStream(ms)).rejects.toThrowError( + 'Publisher is not initialized', + ); + await expect(call.publishAudioStream(ms)).rejects.toThrowError( + 'Publisher is not initialized', + ); + await expect(call.publishScreenShareStream(ms)).rejects.toThrowError( + 'Publisher is not initialized', + ); + }); + + it('publishing is not allowed when there are no tracks in the stream', async () => { + // @ts-expect-error sfuClient is private + call['sfuClient'] = {}; + // @ts-expect-error publisher is private + call['publisher'] = {}; + + const ms = new MediaStream(); + vi.spyOn(ms, 'getVideoTracks').mockReturnValue([]); + vi.spyOn(ms, 'getAudioTracks').mockReturnValue([]); + + await expect(call.publishVideoStream(ms)).rejects.toThrowError( + 'There is no video track in the stream', + ); + await expect(call.publishAudioStream(ms)).rejects.toThrowError( + 'There is no audio track in the stream', + ); + await expect(call.publishScreenShareStream(ms)).rejects.toThrowError( + 'There is no screen share track in the stream', + ); + }); + }); + + describe('Publishing and Unpublishing', () => { + const sessionId = 'abc'; + let publisher: Publisher; + let sfuClient: StreamSfuClient; + + beforeEach(() => { + // @ts-expect-error partial data + call.state.updateOrAddParticipant(sessionId, { + sessionId, + publishedTracks: [], + }); + + sfuClient = vi.fn() as unknown as StreamSfuClient; + // @ts-expect-error sessionId is readonly + sfuClient['sessionId'] = sessionId; + sfuClient.updateMuteStates = vi.fn(); + + publisher = vi.fn() as unknown as Publisher; + publisher.publish = vi.fn(); + + call['sfuClient'] = sfuClient; + call.publisher = publisher; + }); + + it('publish video stream', async () => { + const track = new MediaStreamTrack(); + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getVideoTracks').mockReturnValue([track]); + + await call.publishVideoStream(mediaStream); + expect(publisher.publish).toHaveBeenCalledWith(track, TrackType.VIDEO); + expect(call['trackPublishOrder']).toEqual([TrackType.VIDEO]); + + expect(sfuClient.updateMuteStates).toHaveBeenCalledWith([ + { trackType: TrackType.VIDEO, muted: false }, + ]); + + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant).toBeDefined(); + expect(participant!.publishedTracks).toEqual([TrackType.VIDEO]); + expect(participant!.videoStream).toEqual(mediaStream); + }); + + it('publish audio stream', async () => { + const track = new MediaStreamTrack(); + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getAudioTracks').mockReturnValue([track]); + + await call.publishAudioStream(mediaStream); + expect(publisher.publish).toHaveBeenCalledWith(track, TrackType.AUDIO); + expect(call['trackPublishOrder']).toEqual([TrackType.AUDIO]); + + expect(sfuClient.updateMuteStates).toHaveBeenCalledWith([ + { trackType: TrackType.AUDIO, muted: false }, + ]); + + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant).toBeDefined(); + expect(participant!.publishedTracks).toEqual([TrackType.AUDIO]); + expect(participant!.audioStream).toEqual(mediaStream); + }); + + it('publish screen share stream', async () => { + const track = new MediaStreamTrack(); + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getVideoTracks').mockReturnValue([track]); + + await call.publishScreenShareStream(mediaStream); + expect(publisher.publish).toHaveBeenCalledWith( + track, + TrackType.SCREEN_SHARE, + ); + expect(call['trackPublishOrder']).toEqual([TrackType.SCREEN_SHARE]); + + expect(sfuClient.updateMuteStates).toHaveBeenCalledWith([ + { trackType: TrackType.SCREEN_SHARE, muted: false }, + ]); + + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant).toBeDefined(); + expect(participant!.publishedTracks).toEqual([TrackType.SCREEN_SHARE]); + expect(participant!.screenShareStream).toEqual(mediaStream); + }); + + it('publish screen share stream with audio', async () => { + const videoTrack = new MediaStreamTrack(); + const audioTrack = new MediaStreamTrack(); + const mediaStream = new MediaStream(); + vi.spyOn(mediaStream, 'getVideoTracks').mockReturnValue([videoTrack]); + vi.spyOn(mediaStream, 'getAudioTracks').mockReturnValue([audioTrack]); + + await call.publishScreenShareStream(mediaStream); + expect(publisher.publish).toHaveBeenCalledWith( + videoTrack, + TrackType.SCREEN_SHARE, + ); + expect(publisher.publish).toHaveBeenCalledWith( + audioTrack, + TrackType.SCREEN_SHARE_AUDIO, + ); + expect(call['trackPublishOrder']).toEqual([ + TrackType.SCREEN_SHARE, + TrackType.SCREEN_SHARE_AUDIO, + ]); + + expect(sfuClient.updateMuteStates).toHaveBeenCalledWith([ + { trackType: TrackType.SCREEN_SHARE, muted: false }, + { trackType: TrackType.SCREEN_SHARE_AUDIO, muted: false }, + ]); + + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant).toBeDefined(); + expect(participant!.publishedTracks).toEqual([ + TrackType.SCREEN_SHARE, + TrackType.SCREEN_SHARE_AUDIO, + ]); + expect(participant!.screenShareStream).toEqual(mediaStream); + expect(participant!.screenShareAudioStream).toEqual(mediaStream); + }); + + it('unpublish video stream', async () => { + const mediaStream = new MediaStream(); + call.state.updateParticipant(sessionId, { + publishedTracks: [TrackType.VIDEO, TrackType.AUDIO], + videoStream: mediaStream, + }); + await call.stopPublish(TrackType.VIDEO); + expect(publisher.publish).not.toHaveBeenCalled(); + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant!.publishedTracks).toEqual([TrackType.AUDIO]); + expect(participant!.videoStream).toBeUndefined(); + }); + + it('unpublish audio stream', async () => { + const mediaStream = new MediaStream(); + call.state.updateParticipant(sessionId, { + publishedTracks: [TrackType.VIDEO, TrackType.AUDIO], + audioStream: mediaStream, + }); + await call.stopPublish(TrackType.AUDIO); + expect(publisher.publish).not.toHaveBeenCalled(); + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant!.publishedTracks).toEqual([TrackType.VIDEO]); + expect(participant!.audioStream).toBeUndefined(); + }); + + it('unpublish screen share stream', async () => { + const mediaStream = new MediaStream(); + call.state.updateParticipant(sessionId, { + publishedTracks: [TrackType.SCREEN_SHARE, TrackType.SCREEN_SHARE_AUDIO], + screenShareStream: mediaStream, + screenShareAudioStream: mediaStream, + }); + await call.stopPublish( + TrackType.SCREEN_SHARE, + TrackType.SCREEN_SHARE_AUDIO, + ); + expect(publisher.publish).not.toHaveBeenCalled(); + const participant = call.state.findParticipantBySessionId(sessionId); + expect(participant!.publishedTracks).toEqual([]); + expect(participant!.screenShareStream).toBeUndefined(); + expect(participant!.screenShareAudioStream).toBeUndefined(); + }); + }); +}); diff --git a/packages/client/src/devices/CameraManager.ts b/packages/client/src/devices/CameraManager.ts index c0e326c86c..51a136204c 100644 --- a/packages/client/src/devices/CameraManager.ts +++ b/packages/client/src/devices/CameraManager.ts @@ -111,7 +111,7 @@ export class CameraManager extends InputMediaDeviceManager { return this.call.publishVideoStream(stream); } - protected stopPublishStream(stopTracks: boolean): Promise { - return this.call.stopPublish(TrackType.VIDEO, stopTracks); + protected stopPublishStream(): Promise { + return this.call.stopPublish(TrackType.VIDEO); } } diff --git a/packages/client/src/devices/InputMediaDeviceManager.ts b/packages/client/src/devices/InputMediaDeviceManager.ts index ad31d160e9..e69184fb27 100644 --- a/packages/client/src/devices/InputMediaDeviceManager.ts +++ b/packages/client/src/devices/InputMediaDeviceManager.ts @@ -243,41 +243,39 @@ export abstract class InputMediaDeviceManager< protected abstract publishStream(stream: MediaStream): Promise; - protected abstract stopPublishStream(stopTracks: boolean): Promise; + protected abstract stopPublishStream(): Promise; protected getTracks(): MediaStreamTrack[] { return this.state.mediaStream?.getTracks() ?? []; } protected async muteStream(stopTracks: boolean = true) { - if (!this.state.mediaStream) return; + const mediaStream = this.state.mediaStream; + if (!mediaStream) return; this.logger('debug', `${stopTracks ? 'Stopping' : 'Disabling'} stream`); if (this.call.state.callingState === CallingState.JOINED) { - await this.stopPublishStream(stopTracks); + await this.stopPublishStream(); } this.muteLocalStream(stopTracks); const allEnded = this.getTracks().every((t) => t.readyState === 'ended'); if (allEnded) { - if ( - this.state.mediaStream && - // @ts-expect-error release() is present in react-native-webrtc - typeof this.state.mediaStream.release === 'function' - ) { + // @ts-expect-error release() is present in react-native-webrtc + if (typeof mediaStream.release === 'function') { // @ts-expect-error called to dispose the stream in RN - this.state.mediaStream.release(); + mediaStream.release(); } this.state.setMediaStream(undefined, undefined); this.filters.forEach((entry) => entry.stop?.()); } } - private muteTracks() { + private disableTracks() { this.getTracks().forEach((track) => { if (track.enabled) track.enabled = false; }); } - private unmuteTracks() { + private enableTracks() { this.getTracks().forEach((track) => { if (!track.enabled) track.enabled = true; }); @@ -296,7 +294,7 @@ export abstract class InputMediaDeviceManager< if (stopTracks) { this.stopTracks(); } else { - this.muteTracks(); + this.disableTracks(); } } @@ -309,7 +307,7 @@ export abstract class InputMediaDeviceManager< this.getTracks().every((t) => t.readyState === 'live') ) { stream = this.state.mediaStream; - this.unmuteTracks(); + this.enableTracks(); } else { const defaultConstraints = this.state.defaultConstraints; const constraints: MediaTrackConstraints = { @@ -447,11 +445,8 @@ export abstract class InputMediaDeviceManager< let isDeviceDisconnected = false; let isDeviceReplaced = false; - const currentDevice = this.findDeviceInList( - currentDevices, - deviceId, - ); - const prevDevice = this.findDeviceInList(prevDevices, deviceId); + const currentDevice = this.findDevice(currentDevices, deviceId); + const prevDevice = this.findDevice(prevDevices, deviceId); if (!currentDevice && prevDevice) { isDeviceDisconnected = true; } else if ( @@ -490,9 +485,8 @@ export abstract class InputMediaDeviceManager< ); } - private findDeviceInList(devices: MediaDeviceInfo[], deviceId: string) { - return devices.find( - (d) => d.deviceId === deviceId && d.kind === this.mediaDeviceKind, - ); + private findDevice(devices: MediaDeviceInfo[], deviceId: string) { + const kind = this.mediaDeviceKind; + return devices.find((d) => d.deviceId === deviceId && d.kind === kind); } } diff --git a/packages/client/src/devices/MicrophoneManager.ts b/packages/client/src/devices/MicrophoneManager.ts index c3a961050e..94bf042b71 100644 --- a/packages/client/src/devices/MicrophoneManager.ts +++ b/packages/client/src/devices/MicrophoneManager.ts @@ -215,8 +215,8 @@ export class MicrophoneManager extends InputMediaDeviceManager { - return this.call.stopPublish(TrackType.AUDIO, stopTracks); + protected stopPublishStream(): Promise { + return this.call.stopPublish(TrackType.AUDIO); } private async startSpeakingWhileMutedDetection(deviceId?: string) { diff --git a/packages/client/src/devices/ScreenShareManager.ts b/packages/client/src/devices/ScreenShareManager.ts index b71b617bf5..d947fb4a73 100644 --- a/packages/client/src/devices/ScreenShareManager.ts +++ b/packages/client/src/devices/ScreenShareManager.ts @@ -46,7 +46,7 @@ export class ScreenShareManager extends InputMediaDeviceManager< async disableScreenShareAudio(): Promise { this.state.setAudioEnabled(false); if (this.call.publisher?.isPublishing(TrackType.SCREEN_SHARE_AUDIO)) { - await this.call.stopPublish(TrackType.SCREEN_SHARE_AUDIO, true); + await this.call.stopPublish(TrackType.SCREEN_SHARE_AUDIO); } } @@ -83,9 +83,11 @@ export class ScreenShareManager extends InputMediaDeviceManager< return this.call.publishScreenShareStream(stream); } - protected async stopPublishStream(stopTracks: boolean): Promise { - await this.call.stopPublish(TrackType.SCREEN_SHARE, stopTracks); - await this.call.stopPublish(TrackType.SCREEN_SHARE_AUDIO, stopTracks); + protected async stopPublishStream(): Promise { + return this.call.stopPublish( + TrackType.SCREEN_SHARE, + TrackType.SCREEN_SHARE_AUDIO, + ); } /** diff --git a/packages/client/src/devices/__tests__/CameraManager.test.ts b/packages/client/src/devices/__tests__/CameraManager.test.ts index 504c84656e..5e445b2105 100644 --- a/packages/client/src/devices/__tests__/CameraManager.test.ts +++ b/packages/client/src/devices/__tests__/CameraManager.test.ts @@ -105,10 +105,7 @@ describe('CameraManager', () => { await manager.disable(); - expect(manager['call'].stopPublish).toHaveBeenCalledWith( - TrackType.VIDEO, - true, - ); + expect(manager['call'].stopPublish).toHaveBeenCalledWith(TrackType.VIDEO); }); it('flip', async () => { diff --git a/packages/client/src/devices/__tests__/InputMediaDeviceManager.test.ts b/packages/client/src/devices/__tests__/InputMediaDeviceManager.test.ts index 3d147feee4..00a3f8e276 100644 --- a/packages/client/src/devices/__tests__/InputMediaDeviceManager.test.ts +++ b/packages/client/src/devices/__tests__/InputMediaDeviceManager.test.ts @@ -4,11 +4,11 @@ import { CallingState, StreamVideoWriteableStateStore } from '../../store'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { - MockTrack, emitDeviceIds, mockBrowserPermission, mockCall, mockDeviceIds$, + MockTrack, mockVideoDevices, mockVideoStream, } from './mocks'; @@ -123,7 +123,7 @@ describe('InputMediaDeviceManager.test', () => { await manager.disable(); - expect(manager.stopPublishStream).toHaveBeenCalledWith(true); + expect(manager.stopPublishStream).toHaveBeenCalled(); }); it('disable device with forceStop', async () => { @@ -134,7 +134,7 @@ describe('InputMediaDeviceManager.test', () => { await manager.disable(true); - expect(manager.stopPublishStream).toHaveBeenCalledWith(true); + expect(manager.stopPublishStream).toHaveBeenCalled(); expect(manager.state.mediaStream).toBeUndefined(); expect(manager.state.status).toBe('disabled'); }); @@ -179,7 +179,7 @@ describe('InputMediaDeviceManager.test', () => { const deviceId = mockVideoDevices[1].deviceId; await manager.select(deviceId); - expect(manager.stopPublishStream).toHaveBeenCalledWith(true); + expect(manager.stopPublishStream).toHaveBeenCalled(); expect(manager.getStream).toHaveBeenCalledWith({ deviceId: { exact: deviceId }, }); diff --git a/packages/client/src/devices/__tests__/MicrophoneManager.test.ts b/packages/client/src/devices/__tests__/MicrophoneManager.test.ts index 58f527f507..8cd3ea12a3 100644 --- a/packages/client/src/devices/__tests__/MicrophoneManager.test.ts +++ b/packages/client/src/devices/__tests__/MicrophoneManager.test.ts @@ -60,7 +60,7 @@ class NoiseCancellationStub implements INoiseCancellation { enable = () => this.listeners['change']?.forEach((l) => l(true)); disable = () => this.listeners['change']?.forEach((l) => l(false)); dispose = () => Promise.resolve(undefined); - toFilter = () => async (ms: MediaStream) => ms; + toFilter = () => (ms: MediaStream) => ({ output: ms }); on = (event, callback) => { (this.listeners[event] ??= []).push(callback); return () => {}; @@ -121,10 +121,7 @@ describe('MicrophoneManager', () => { await manager.disable(); - expect(manager['call'].stopPublish).toHaveBeenCalledWith( - TrackType.AUDIO, - false, - ); + expect(manager['call'].stopPublish).toHaveBeenCalledWith(TrackType.AUDIO); }); it('disable-enable mic should set track.enabled', async () => { diff --git a/packages/client/src/devices/__tests__/ScreenShareManager.test.ts b/packages/client/src/devices/__tests__/ScreenShareManager.test.ts index 8c156298ca..0c6f5cc790 100644 --- a/packages/client/src/devices/__tests__/ScreenShareManager.test.ts +++ b/packages/client/src/devices/__tests__/ScreenShareManager.test.ts @@ -49,8 +49,8 @@ describe('ScreenShareManager', () => { expect(RxUtils.getCurrentValue(devices)).toEqual([]); }); - it('select device', () => { - expect(manager.select('any-device-id')).rejects.toThrowError(); + it('select device', async () => { + await expect(manager.select('any-device-id')).rejects.toThrowError(); }); it('get stream', async () => { @@ -125,10 +125,9 @@ describe('ScreenShareManager', () => { await manager.disable(); expect(manager.state.status).toEqual('disabled'); - expect(call.stopPublish).toHaveBeenCalledWith(TrackType.SCREEN_SHARE, true); expect(call.stopPublish).toHaveBeenCalledWith( + TrackType.SCREEN_SHARE, TrackType.SCREEN_SHARE_AUDIO, - true, ); }); }); diff --git a/packages/client/src/events/participant.ts b/packages/client/src/events/participant.ts index 4da4658daf..4cc9b900c8 100644 --- a/packages/client/src/events/participant.ts +++ b/packages/client/src/events/participant.ts @@ -13,6 +13,7 @@ import { } from '../types'; import { CallState } from '../store'; import { trackTypeToParticipantStreamKey } from '../rtc'; +import { pushToIfMissing } from '../helpers/array'; /** * An event responder which handles the `participantJoined` event. @@ -88,7 +89,7 @@ export const watchTrackPublished = (state: CallState) => { state.updateOrAddParticipant(sessionId, participant); } else { state.updateParticipant(sessionId, (p) => ({ - publishedTracks: [...p.publishedTracks, type].filter(unique), + publishedTracks: pushToIfMissing([...p.publishedTracks], type), })); } }; @@ -114,8 +115,6 @@ export const watchTrackUnpublished = (state: CallState) => { }; }; -const unique = (v: T, i: number, arr: T[]) => arr.indexOf(v) === i; - /** * Reconciles orphaned tracks (if any) for the given participant. * diff --git a/packages/client/src/helpers/array.ts b/packages/client/src/helpers/array.ts new file mode 100644 index 0000000000..0d062d25c9 --- /dev/null +++ b/packages/client/src/helpers/array.ts @@ -0,0 +1,14 @@ +/** + * Adds unique values to an array. + * + * @param arr the array to add to. + * @param values the values to add. + */ +export const pushToIfMissing = (arr: T[], ...values: T[]): T[] => { + for (const v of values) { + if (!arr.includes(v)) { + arr.push(v); + } + } + return arr; +}; diff --git a/packages/client/src/rtc/Publisher.ts b/packages/client/src/rtc/Publisher.ts index 257e2fe4bc..82184ec25f 100644 --- a/packages/client/src/rtc/Publisher.ts +++ b/packages/client/src/rtc/Publisher.ts @@ -17,10 +17,7 @@ import { toVideoLayers, } from './videoLayers'; import { isSvcCodec } from './codecs'; -import { - isAudioTrackType, - trackTypeToParticipantStreamKey, -} from './helpers/tracks'; +import { isAudioTrackType } from './helpers/tracks'; import { extractMid } from './helpers/sdp'; import { withoutConcurrency } from '../helpers/concurrency'; @@ -35,7 +32,6 @@ export type PublisherConstructorOpts = BasePeerConnectionOpts & { */ export class Publisher extends BasePeerConnection { private readonly transceiverCache = new TransceiverCache(); - private readonly knownTrackIds = new Set(); private readonly unsubscribeOnIceRestart: () => void; private readonly unsubscribeChangePublishQuality: () => void; @@ -119,15 +115,10 @@ export class Publisher extends BasePeerConnection { * Consecutive calls to this method will replace the stream. * The previous stream will be stopped. * - * @param mediaStream the media stream to publish. * @param track the track to publish. * @param trackType the track type to publish. */ - publishStream = async ( - mediaStream: MediaStream, - track: MediaStreamTrack, - trackType: TrackType, - ) => { + publish = async (track: MediaStreamTrack, trackType: TrackType) => { if (track.readyState === 'ended') { throw new Error(`Can't publish a track that has ended already.`); } @@ -136,27 +127,6 @@ export class Publisher extends BasePeerConnection { throw new Error(`No publish options found for ${TrackType[trackType]}`); } - // enable the track if it is disabled - if (!track.enabled) track.enabled = true; - - if (!this.knownTrackIds.has(track.id)) { - // listen for 'ended' event on the track as it might be ended abruptly - // by an external factors such as permission revokes, a disconnected device, etc. - // keep in mind that `track.stop()` doesn't trigger this event. - const handleTrackEnded = () => { - this.logger('info', `Track ${TrackType[trackType]} has ended abruptly`); - track.removeEventListener('ended', handleTrackEnded); - this.notifyTrackMuteStateChanged(mediaStream, trackType, true).catch( - (err) => this.logger('warn', `Couldn't notify track mute state`, err), - ); - }; - track.addEventListener('ended', handleTrackEnded); - - // we now publish clones, hence we need to keep track of the original track ids - // to avoid assigning the same event listener multiple times - this.knownTrackIds.add(track.id); - } - for (const publishOption of this.publishOptions) { if (publishOption.trackType !== trackType) continue; @@ -168,11 +138,9 @@ export class Publisher extends BasePeerConnection { if (!transceiver) { this.addTransceiver(trackToPublish, publishOption); } else { - await this.updateTransceiver(transceiver, trackToPublish); + await transceiver.sender.replaceTrack(trackToPublish); } } - - await this.notifyTrackMuteStateChanged(mediaStream, trackType, false); }; /** @@ -198,22 +166,6 @@ export class Publisher extends BasePeerConnection { this.transceiverCache.add(publishOption, transceiver); }; - /** - * Updates the given transceiver with the new track. - * Stops the previous track and replaces it with the new one. - */ - private updateTransceiver = async ( - transceiver: RTCRtpTransceiver, - track: MediaStreamTrack, - ) => { - const previousTrack = transceiver.sender.track; - // don't stop the track if we are re-publishing the same track - if (previousTrack && previousTrack !== track) { - previousTrack.stop(); - } - await transceiver.sender.replaceTrack(track); - }; - /** * Switches the codec of the given track type. */ @@ -252,32 +204,6 @@ export class Publisher extends BasePeerConnection { } }; - /** - * Stops publishing the given track type to the SFU, if it is currently being published. - * Underlying track will be stopped and removed from the publisher. - * @param trackType the track type to unpublish. - * @param stopTrack specifies whether track should be stopped or just disabled - */ - unpublishStream = async (trackType: TrackType, stopTrack: boolean) => { - for (const option of this.publishOptions) { - if (option.trackType !== trackType) continue; - - const transceiver = this.transceiverCache.get(option); - const track = transceiver?.sender.track; - if (!track) continue; - - if (stopTrack && track.readyState === 'live') { - track.stop(); - } else if (track.enabled) { - track.enabled = false; - } - } - - if (this.state.localParticipant?.publishedTracks.includes(trackType)) { - await this.notifyTrackMuteStateChanged(undefined, trackType, true); - } - }; - /** * Returns true if the given track type is currently being published to the SFU. * @@ -308,34 +234,6 @@ export class Publisher extends BasePeerConnection { return undefined; }; - // FIXME move to InputMediaDeviceManager - private notifyTrackMuteStateChanged = async ( - mediaStream: MediaStream | undefined, - trackType: TrackType, - isMuted: boolean, - ) => { - await this.sfuClient.updateMuteState(trackType, isMuted); - - const audioOrVideoOrScreenShareStream = - trackTypeToParticipantStreamKey(trackType); - if (!audioOrVideoOrScreenShareStream) return; - if (isMuted) { - this.state.updateParticipant(this.sfuClient.sessionId, (p) => ({ - publishedTracks: p.publishedTracks.filter((t) => t !== trackType), - [audioOrVideoOrScreenShareStream]: undefined, - })); - } else { - this.state.updateParticipant(this.sfuClient.sessionId, (p) => { - return { - publishedTracks: p.publishedTracks.includes(trackType) - ? p.publishedTracks - : [...p.publishedTracks, trackType], - [audioOrVideoOrScreenShareStream]: mediaStream, - }; - }); - } - }; - /** * Stops publishing all tracks and stop all tracks. */ @@ -514,8 +412,7 @@ export class Publisher extends BasePeerConnection { * Returns a list of tracks that are currently being published. * @param sdp an optional SDP to extract the `mid` from. */ - getAnnouncedTracks = (sdp?: string): TrackInfo[] => { - sdp = sdp || this.pc.localDescription?.sdp; + getAnnouncedTracks = (sdp: string | undefined): TrackInfo[] => { const trackInfos: TrackInfo[] = []; for (const bundle of this.transceiverCache.items()) { const { transceiver, publishOption } = bundle; diff --git a/packages/client/src/rtc/__tests__/Publisher.test.ts b/packages/client/src/rtc/__tests__/Publisher.test.ts index 0c7b2ee0a0..1c28cdcd3e 100644 --- a/packages/client/src/rtc/__tests__/Publisher.test.ts +++ b/packages/client/src/rtc/__tests__/Publisher.test.ts @@ -81,143 +81,60 @@ describe('Publisher', () => { dispatcher.offAll(); }); - it('can publish, re-publish and un-publish a stream', async () => { - const mediaStream = new MediaStream(); - const track = new MediaStreamTrack(); - mediaStream.addTrack(track); - - state.setParticipants([ - // @ts-ignore - { - isLocalParticipant: true, - userId: 'test-user-id', - sessionId: sessionId, - publishedTracks: [], - }, - ]); + describe('Publishing', () => { + it('should throw when publishing ended tracks', async () => { + const track = new MediaStreamTrack(); + // @ts-ignore readonly field + track.readyState = 'ended'; + await expect(publisher.publish(track, TrackType.VIDEO)).rejects.toThrow(); + }); - vi.spyOn(track, 'getSettings').mockReturnValue({ - width: 640, - height: 480, - deviceId: 'test-device-id', + it('should throw when attempting to publish a track that has no publish options', async () => { + const track = new MediaStreamTrack(); + await expect(publisher.publish(track, TrackType.AUDIO)).rejects.toThrow(); }); - vi.spyOn(track, 'clone').mockReturnValue(track); - - const transceiver = new RTCRtpTransceiver(); - vi.spyOn(transceiver.sender, 'track', 'get').mockReturnValue(track); - vi.spyOn(publisher['pc'], 'addTransceiver').mockReturnValue(transceiver); - vi.spyOn(publisher['pc'], 'getTransceivers').mockReturnValue([transceiver]); - - sfuClient.updateMuteState = vi.fn(); - - // initial publish - await publisher.publishStream(mediaStream, track, TrackType.VIDEO); - - expect(state.localParticipant?.publishedTracks).toContain(TrackType.VIDEO); - expect(state.localParticipant?.videoStream).toEqual(mediaStream); - expect(sfuClient.updateMuteState).toHaveBeenCalledWith( - TrackType.VIDEO, - false, - ); - expect(track.addEventListener).toHaveBeenCalledWith( - 'ended', - expect.any(Function), - ); - - // re-publish a new track - const newMediaStream = new MediaStream(); - const newTrack = new MediaStreamTrack(); - newMediaStream.addTrack(newTrack); - - vi.spyOn(newTrack, 'getSettings').mockReturnValue({ - width: 1280, - height: 720, - deviceId: 'test-device-id-2', + + it('should add a transceiver for new tracks', async () => { + const track = new MediaStreamTrack(); + const clone = new MediaStreamTrack(); + vi.spyOn(track, 'clone').mockReturnValue(clone); + + await publisher.publish(track, TrackType.VIDEO); + + expect(track.clone).toHaveBeenCalled(); + expect(publisher['pc'].addTransceiver).toHaveBeenCalledWith(clone, { + direction: 'sendonly', + sendEncodings: [ + { + rid: 'q', + active: true, + maxBitrate: 1000, + height: 720, + width: 1280, + maxFramerate: 30, + scalabilityMode: 'L3T3_KEY', + }, + ], + }); }); - vi.spyOn(newTrack, 'clone').mockReturnValue(newTrack); - - await publisher.publishStream(newMediaStream, newTrack, TrackType.VIDEO); - vi.spyOn(transceiver.sender, 'track', 'get').mockReturnValue(newTrack); - - expect(track.stop).toHaveBeenCalled(); - expect(newTrack.addEventListener).toHaveBeenCalledWith( - 'ended', - expect.any(Function), - ); - expect(transceiver.sender.replaceTrack).toHaveBeenCalledWith(newTrack); - - // stop publishing - await publisher.unpublishStream(TrackType.VIDEO, true); - expect(newTrack.stop).toHaveBeenCalled(); - expect(state.localParticipant?.publishedTracks).not.toContain( - TrackType.VIDEO, - ); - }); - it('can publish and un-publish with just enabling and disabling tracks', async () => { - const mediaStream = new MediaStream(); - const track = new MediaStreamTrack(); - mediaStream.addTrack(track); + it('should update an existing transceiver for a new track', async () => { + const track = new MediaStreamTrack(); + const clone = new MediaStreamTrack(); + vi.spyOn(track, 'clone').mockReturnValue(clone); - state.setParticipants([ - // @ts-ignore - { - isLocalParticipant: true, - userId: 'test-user-id', - sessionId: sessionId, - publishedTracks: [], - }, - ]); + const transceiver = new RTCRtpTransceiver(); + publisher['transceiverCache'].add( + publisher['publishOptions'][0], + transceiver, + ); + + await publisher.publish(track, TrackType.VIDEO); - vi.spyOn(track, 'getSettings').mockReturnValue({ - width: 640, - height: 480, - deviceId: 'test-device-id', + expect(track.clone).toHaveBeenCalled(); + expect(publisher['pc'].addTransceiver).not.toHaveBeenCalled(); + expect(transceiver.sender.replaceTrack).toHaveBeenCalledWith(clone); }); - vi.spyOn(track, 'clone').mockReturnValue(track); - - const transceiver = new RTCRtpTransceiver(); - vi.spyOn(transceiver.sender, 'track', 'get').mockReturnValue(track); - vi.spyOn(publisher['pc'], 'addTransceiver').mockReturnValue(transceiver); - vi.spyOn(publisher['pc'], 'getTransceivers').mockReturnValue([transceiver]); - - sfuClient.updateMuteState = vi.fn(); - - // initial publish - await publisher.publishStream(mediaStream, track, TrackType.VIDEO); - - expect(state.localParticipant?.publishedTracks).toContain(TrackType.VIDEO); - expect(track.enabled).toBe(true); - expect(state.localParticipant?.videoStream).toEqual(mediaStream); - expect(sfuClient.updateMuteState).toHaveBeenCalledWith( - TrackType.VIDEO, - false, - ); - - expect(track.addEventListener).toHaveBeenCalledWith( - 'ended', - expect.any(Function), - ); - - // stop publishing - await publisher.unpublishStream(TrackType.VIDEO, false); - expect(track.stop).not.toHaveBeenCalled(); - expect(track.enabled).toBe(false); - expect(state.localParticipant?.publishedTracks).not.toContain( - TrackType.VIDEO, - ); - expect(state.localParticipant?.videoStream).toBeUndefined(); - - const addEventListenerSpy = vi.spyOn(track, 'addEventListener'); - const removeEventListenerSpy = vi.spyOn(track, 'removeEventListener'); - - // start publish again - await publisher.publishStream(mediaStream, track, TrackType.VIDEO); - - expect(track.enabled).toBe(true); - // republishing the same stream should use the previously registered event handlers - expect(removeEventListenerSpy).not.toHaveBeenCalled(); - expect(addEventListenerSpy).not.toHaveBeenCalled(); }); describe('Publisher ICE Restart', () => { @@ -540,10 +457,6 @@ describe('Publisher', () => { const transceiver = new RTCRtpTransceiver(); const track = new MediaStreamTrack(); vi.spyOn(transceiver.sender, 'track', 'get').mockReturnValue(track); - vi.spyOn(track, 'getSettings').mockReturnValue({ - width: 640, - height: 480, - }); vi.spyOn(track, 'clone').mockReturnValue(track); // @ts-expect-error private method vi.spyOn(publisher, 'addTransceiver'); @@ -630,19 +543,11 @@ describe('Publisher', () => { const transceiver = new RTCRtpTransceiver(); const track = new MediaStreamTrack(); vi.spyOn(track, 'enabled', 'get').mockReturnValue(true); - vi.spyOn(track, 'getSettings').mockReturnValue({ - width: 640, - height: 480, - }); vi.spyOn(transceiver.sender, 'track', 'get').mockReturnValue(track); const inactiveTransceiver = new RTCRtpTransceiver(); const inactiveTrack = new MediaStreamTrack(); vi.spyOn(inactiveTrack, 'enabled', 'get').mockReturnValue(false); - vi.spyOn(inactiveTrack, 'getSettings').mockReturnValue({ - width: 640, - height: 480, - }); vi.spyOn(inactiveTransceiver.sender, 'track', 'get').mockReturnValue( inactiveTrack, ); @@ -725,7 +630,7 @@ describe('Publisher', () => { }); it('getAnnouncedTracks should return all tracks', () => { - const trackInfos = publisher.getAnnouncedTracks(); + const trackInfos = publisher.getAnnouncedTracks(''); expect(trackInfos).toHaveLength(2); expect(trackInfos[0].muted).toBe(false); expect(trackInfos[0].mid).toBe('0'); diff --git a/packages/client/src/rtc/__tests__/mocks/webrtc.mocks.ts b/packages/client/src/rtc/__tests__/mocks/webrtc.mocks.ts index 6992d74700..4459fcc0c4 100644 --- a/packages/client/src/rtc/__tests__/mocks/webrtc.mocks.ts +++ b/packages/client/src/rtc/__tests__/mocks/webrtc.mocks.ts @@ -25,8 +25,10 @@ vi.stubGlobal('RTCPeerConnection', RTCPeerConnectionMock); const MediaStreamMock = vi.fn((): Partial => { return { - getTracks: vi.fn(), + getTracks: vi.fn().mockReturnValue([]), addTrack: vi.fn(), + getAudioTracks: vi.fn().mockReturnValue([]), + getVideoTracks: vi.fn().mockReturnValue([]), }; }); vi.stubGlobal('MediaStream', MediaStreamMock); @@ -35,7 +37,7 @@ const MediaStreamTrackMock = vi.fn((): Partial => { return { addEventListener: vi.fn(), removeEventListener: vi.fn(), - getSettings: vi.fn(), + getSettings: vi.fn().mockReturnValue({ width: 1280, height: 720 }), stop: vi.fn(), clone: vi.fn(), readyState: 'live',