diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index fa8f9a1e..0b84524a 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -4,7 +4,8 @@ /* eslint-disable require-jsdoc */ /* global Promise, Map, WebTransport, WebTransportBidirectionalStream, - Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */ + Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor, + proto */ 'use strict'; @@ -44,6 +45,10 @@ export class QuicConnection extends EventDispatcher { this._transportId = this._token.transportId; this._initReceiveStreamReader(); this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'}); + // Key is SSRC, value is a MediaStreamTrackGenerator writer. + this._mstGeneratorWriters = new Map(); + this._initRtpModule(); + this._initDatagramReader(); } /** @@ -77,6 +82,10 @@ export class QuicConnection extends EventDispatcher { await this._authenticate(this._tokenString); } + async _initRtpModule() { + this._worker.postMessage(['init-rtp']); + } + async _initReceiveStreamReader() { const receiveStreamReader = this._quicTransport.incomingBidirectionalStreams.getReader(); @@ -173,6 +182,20 @@ export class QuicConnection extends EventDispatcher { } } + async _initDatagramReader() { + const datagramReader = this._quicTransport.datagrams.readable.getReader(); + console.log('Get datagram reader.'); + let receivingDone = false; + while (!receivingDone) { + const {value: datagram, done: readingDatagramsDone} = + await datagramReader.read(); + this._worker.postMessage(['rtp-packet', datagram]); + if (readingDatagramsDone) { + receivingDone = true; + } + } + } + _createSubscription(id, receiveStream) { // TODO: Incomplete subscription. const subscription = new Subscription(id, () => { @@ -207,6 +230,94 @@ export class QuicConnection extends EventDispatcher { return quicStream; } + async bindFeedbackReader(stream, publicationId) { + // The receiver side of a publication stream starts with a UUID of + // publication ID, then each feedback message has a 4 bytes header indicates + // its length, and followed by protobuf encoded body. + const feedbackChunkReader = stream.readable.getReader(); + let feedbackChunksDone = false; + let publicationIdOffset = 0; + const headerSize=4; + const header = new Uint8Array(headerSize); + let headerOffset = 0; + let bodySize = 0; + let bodyOffset = 0; + let bodyBytes; + while (!feedbackChunksDone) { + let valueOffset=0; + const {value, done} = await feedbackChunkReader.read(); + Logger.debug(value); + while (valueOffset < value.byteLength) { + if (publicationIdOffset < uuidByteLength) { + // TODO: Check publication ID matches. For now, we just skip this ID. + const readLength = + Math.min(uuidByteLength - publicationIdOffset, value.byteLength); + valueOffset += readLength; + publicationIdOffset += readLength; + } + if (headerOffset < headerSize) { + // Read header. + const copyLength = Math.min( + headerSize - headerOffset, value.byteLength - valueOffset); + if (copyLength === 0) { + continue; + } + header.set( + value.subarray(valueOffset, valueOffset + copyLength), + headerOffset); + headerOffset += copyLength; + valueOffset += copyLength; + if (headerOffset < headerSize) { + continue; + } + bodySize = 0; + bodyOffset = 0; + for (let i = 0; i < headerSize; i++) { + bodySize += (header[i] << ((headerSize - 1 - i) * 8)); + } + bodyBytes = new Uint8Array(bodySize); + Logger.debug('Body size ' + bodySize); + } + if (bodyOffset < bodySize) { + const copyLength = + Math.min(bodySize - bodyOffset, value.byteLength - valueOffset); + if (copyLength === 0) { + continue; + } + Logger.debug('Bytes for body: '+copyLength); + bodyBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + bodyOffset); + bodyOffset += copyLength; + valueOffset += copyLength; + if (valueOffset < bodySize) { + continue; + } + // Decode body. + const feedback = proto.owt.protobuf.Feedback.deserializeBinary(bodyBytes); + this.handleFeedback(feedback, publicationId); + } + } + if (done) { + feedbackChunksDone = true; + break; + } + } + } + + async handleFeedback(feedback, publicationId) { + Logger.debug( + 'Key frame request type: ' + + proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST); + if (feedback.getType() === + proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST) { + this._worker.postMessage( + ['rtcp-feedback', ['key-frame-request', publicationId]]); + } else { + Logger.warning('Unrecognized feedback type ' + feedback.getType()); + } + } + async publish(stream, options) { // TODO: Avoid a stream to be published twice. The first 16 bit data send to // server must be it's publication ID. @@ -225,6 +336,7 @@ export class QuicConnection extends EventDispatcher { for (const track of stream.stream.getTracks()) { const quicStream = await this._quicTransport.createBidirectionalStream(); + this.bindFeedbackReader(quicStream, publicationId); this._quicMediaStreamTracks.set(track.id, quicStream); quicStreams.push(quicStream); } @@ -262,6 +374,7 @@ export class QuicConnection extends EventDispatcher { [ 'media-sender', [ + publicationId, track.id, track.kind, processor.readable, @@ -317,12 +430,12 @@ export class QuicConnection extends EventDispatcher { if (typeof options !== 'object') { return Promise.reject(new TypeError('Options should be an object.')); } - // if (options.audio === undefined) { - // options.audio = !!stream.settings.audio; - // } - // if (options.video === undefined) { - // options.video = !!stream.settings.video; - // } + if (options.audio === undefined) { + options.audio = !!stream.settings.audio; + } + if (options.video === undefined) { + options.video = !!stream.settings.video; + } let mediaOptions; let dataOptions; if (options.audio || options.video) { @@ -375,19 +488,40 @@ export class QuicConnection extends EventDispatcher { }) .then((data) => { this._subscribeOptions.set(data.id, options); - Logger.debug('Subscribe info is set.'); - if (this._quicDataStreams.has(data.id)) { - // QUIC stream created before signaling returns. - // TODO: Update subscription to accept list of QUIC streams. - const subscription = this._createSubscription( - data.id, this._quicDataStreams.get(data.id)[0]); - resolve(subscription); + if (dataOptions) { + // A WebTransport stream is associated with a subscription for + // data. + if (this._quicDataStreams.has(data.id)) { + // QUIC stream created before signaling returns. + // TODO: Update subscription to accept list of QUIC streams. + const subscription = this._createSubscription( + data.id, this._quicDataStreams.get(data.id)[0]); + resolve(subscription); + } else { + this._quicDataStreams.set(data.id, null); + // QUIC stream is not created yet, resolve promise after getting + // QUIC stream. + this._subscribePromises.set( + data.id, {resolve: resolve, reject: reject}); + } } else { - this._quicDataStreams.set(data.id, null); - // QUIC stream is not created yet, resolve promise after getting - // QUIC stream. - this._subscribePromises.set( - data.id, {resolve: resolve, reject: reject}); + // A MediaStream is associated with a subscription for media. + // Media packets are received over WebTransport datagram. + const ms = new Module.MediaSession(); + console.log('Created media session.'); + const generators = []; + for (const track of mediaOptions){ + const generator = + new MediaStreamTrackGenerator({kind: track.type}); + generators.push(generator); + // TODO: Update key with the correct SSRC. + this._mstGeneratorWriters.set( + 0, generator.writable.getWriter()); + } + const mediaStream = new MediaStream(generators); + const subscription = + this._createSubscription(data.id, mediaStream); + resolve(subscription); } if (this._subscriptionInfoReady.has(data.id)) { this._subscriptionInfoReady.get(data.id)(); diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js index dd571817..e65808c0 100644 --- a/src/sdk/conference/webtransport/media-worker.js +++ b/src/sdk/conference/webtransport/media-worker.js @@ -4,43 +4,91 @@ /* eslint-disable require-jsdoc */ /* global AudioEncoder, VideoEncoder, VideoDecoder, Map, ArrayBuffer, - Uint8Array, DataView */ + Uint8Array, DataView, proto */ -import Logger from '../../base/logger.js'; +import initModule from '/src/samples/conference/public/scripts/owt.js'; // Key is MediaStreamTrack ID, value is AudioEncoder or VideoEncoder. const encoders = new Map(); // Key is MediaStreamTrack ID, value is WritableStreamDefaultWriter. const writers = new Map(); +// Key is publication ID, value is bool indicates whether key frame is requested +// for its video track. +const keyFrameRequested = new Map(); +let mediaSession; let frameBuffer; let videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; /* Messages it accepts: - * media-sender: [MediaStreamTrack, WebTransportStream, + * media-sender: [Publication ID, MediaStreamTrack ID, MediaStreamTrack kind, + * MediaStreamTrackProcessor readable, WebTransportStream writable, * AudioEncoderConfig/VideoEncoderConfig] */ // eslint-disable-next-line no-undef -onmessage = (e) => { - if (e.data[0] === 'media-sender') { - const [trackId, trackKind, trackReadable, sendStreamWritable, config] = - e.data[1]; - let encoder; - const writer = sendStreamWritable.getWriter(); - if (trackKind === 'audio') { - encoder = initAudioEncoder(config, writer); - } else { // Video. - encoder = initVideoEncoder(config, writer); - } - encoders.set(trackId, encoder); - writers.set(trackId, writer); - readMediaData(trackReadable, encoder); - writeTrackId(trackKind, writer); +onmessage = async (e) => { + const [command, args] = e.data; + switch (command) { + case 'media-sender': + const [publicationId, trackId, trackKind, trackReadable, sendStreamWritable, config] = + args; + initMediaSender( + publicationId, trackId, trackKind, trackReadable, sendStreamWritable, + config); + break; + case 'rtcp-feedback': + const [feedback, publicationId] = args; + await handleFeedback(feedback, publicationId); + break; + case 'init-rtp': + await initRtpModule(); + break; + case 'rtp-packet': + await handleRtpPacket(args); + break; + default: + console.warn('Unrecognized command ' + command); } }; +async function initMediaSender( + publicationId, trackId, trackKind, trackReadable, sendStreamWritable, + config) { + let encoder; + const writer = sendStreamWritable.getWriter(); + if (trackKind === 'audio') { + encoder = initAudioEncoder(config, writer); + } else { // Video. + encoder = initVideoEncoder(config, writer); + keyFrameRequested[publicationId] = false; + } + encoders.set(trackId, encoder); + writers.set(trackId, writer); + readMediaData(trackReadable, encoder, publicationId); + writeTrackId(trackKind, writer); +} + +async function initRtpModule() { + const wasm = await fetchWasm(); + mediaSession = new wasm.MediaSession(); +} + +async function fetchWasm() { + const owtWasmModule = {}; + initModule(owtWasmModule); + await owtWasmModule.ready; + return owtWasmModule; +} + +async function handleFeedback(feedback, publicationId) { + if (feedback === 'key-frame-request') { + console.log('Setting key frame request flag.'); + keyFrameRequested[publicationId] = true; + } +} + async function videoOutput(writer, chunk, metadata) { // TODO: Combine audio and video output callback. if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { @@ -55,7 +103,7 @@ async function videoOutput(writer, chunk, metadata) { } function videoError(error) { - Logger.error('Video encode error: ' + error.message); + console.error('Video encode error: ' + error.message); } async function audioOutput(writer, chunk, metadata) { @@ -71,7 +119,7 @@ async function audioOutput(writer, chunk, metadata) { } function audioError(error) { - Logger.error(`Audio encode error: ${error.message}`); + console.error(`Audio encode error: ${error.message}`); } async function writeTrackId(kind, writer) { @@ -119,22 +167,34 @@ function videoFrameOutputCallback(frame) { } function webCodecsErrorCallback(error) { - Logger.warn('error: ' + error.message); + console.warn('error: ' + error.message); } // Read data from media track. -async function readMediaData(trackReadable, encoder) { +async function readMediaData(trackReadable, encoder, publicationId) { const reader = trackReadable.getReader(); // eslint-disable-next-line no-constant-condition while (true) { const {value, done} = await reader.read(); if (done) { - Logger.debug('MediaStream ends.'); + console.debug('MediaStream ends.'); break; } - encoder.encode(value); + if (keyFrameRequested.get(publicationId)) { + console.debug(typeof encoder + ' encode a key frame.') + encoder.encode(value, {keyFrame: true}); + keyFrameRequested[publicationId] = false; + } else { + encoder.encode(value); + } value.close(); } } +async function handleRtpPacket(packet){ + // const buffer = Module._malloc(packet.byteLength); + // Module.writeArrayToMemory(packet, buffer); + // this._receiver.onRtpPacket(buffer, packet.byteLength); +} + initVideoDecoder();