Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Publish media data with WebTransport.
Browse files Browse the repository at this point in the history
  • Loading branch information
jianjunz committed Aug 24, 2021
1 parent d1ef66b commit 19b3675
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 29 deletions.
75 changes: 75 additions & 0 deletions src/samples/conference/public/scripts/media-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (C) <2021> Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

/* eslint-disable require-jsdoc */
/* global VideoEncoder */

let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter;
// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0.
const sizePrefix = 4;

onmessage = (e) => {
if (e.data[0] === 'video-source') {
readVideoData(e.data[1]);
} else if (e.data[0] === 'send-stream') {
bidirectionalStreamWritable = e.data[1];
sendStreamWriter = bidirectionalStreamWritable.getWriter();
writeTrackId();
initVideoEncoder();
}
};

async function videoOutput(chunk, metadata) {
if (bidirectionalStreamWritable) {
if (!frameBuffer ||
frameBuffer.byteLength < chunk.byteLength + sizePrefix) {
frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix);
}
const bufferView = new Uint8Array(frameBuffer, sizePrefix);
chunk.copyTo(bufferView);
const dataView =
new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix);
dataView.setUint32(0, chunk.byteLength);
await sendStreamWriter.ready;
await sendStreamWriter.write(dataView);
console.log('Write a frame.');
}
}

function videoError(error) {
console.log('Encode error, ' + error);
}

async function writeTrackId() {
const id = new Uint8Array(16);
id[16] = 2;
await sendStreamWriter.ready;
sendStreamWriter.write(id);
}

function initVideoEncoder() {
videoEncoder = new VideoEncoder({output: videoOutput, error: videoError});
videoEncoder.configure({
codec: 'avc1.4d002a',
width: 640,
height: 480,
framerate: 30,
latencyMode: 'realtime',
avc: {format: 'annexb'},
});
}

// Read data from video track.
async function readVideoData(readable) {
const reader = readable.getReader();
while (true) {
const {value, done} = await reader.read();
if (done) {
console.log('MediaStream ends.');
break;
}
videoEncoder.encode(value);
value.close();
}
}
53 changes: 42 additions & 11 deletions src/samples/conference/public/scripts/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,30 @@
//
// SPDX-License-Identifier: Apache-2.0

/* eslint-disable require-jsdoc */

'use strict';

let quicChannel = null;
let bidirectionalStream = null;
let writeTask;
const conference=new Owt.Conference.ConferenceClient();
let writeTask, mediaStream, mediaWorker, conferenceId, myId;

const conference = new Owt.Conference.ConferenceClient({
webTransportConfiguration: {
serverCertificateFingerprints: [{
value:
'DD:A8:11:FD:A1:08:17:41:36:CD:1A:33:1E:CF:AE:0D:46:3D:15:16:2C:67:C5:A2:06:35:C2:0E:88:A1:9E:C6',
algorithm: 'sha-256',
}]
}
});
conference.addEventListener('streamadded', async (event) => {
console.log(event.stream);
if (event.stream.origin == myId) {
mixStream(
conferenceId, event.stream.id, 'common',
'http://jianjunz-nuc-ubuntu.sh.intel.com:3001');
}
if (event.stream.source.data) {
const subscription = await conference.subscribe(event.stream);
const reader = subscription.stream.readable.getReader();
Expand All @@ -19,25 +35,27 @@ conference.addEventListener('streamadded', async (event) => {
console.log('Subscription ends.');
break;
}
console.log('Received data: '+value);
console.log('Received data: ' + value);
}
}
});

function updateConferenceStatus(message) {
document.getElementById('conference-status').innerHTML +=
('<p>' + message + '</p>');
('<p>' + message + '</p>');
}


function joinConference() {
return new Promise((resolve, reject) => {
createToken(undefined, 'user', 'presenter', resp => {
conference.join(resp).then(() => {
createToken(undefined, 'user', 'presenter', token => {
conference.join(token).then((info) => {
conferenceId = info.id;
myId = info.self.id;
updateConferenceStatus('Connected to conference server.');
resolve();
});
});
}, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001');
});
};

Expand All @@ -57,8 +75,11 @@ function createRandomContentSessionId() {

async function createSendChannel() {
bidirectionalStream = await conference.createSendStream();
const localStream=new Owt.Base.LocalStream(bidirectionalStream, new Owt.Base.StreamSourceInfo(undefined, undefined,true));
const publication = await conference.publish(localStream);
const localStream = new Owt.Base.LocalStream(
bidirectionalStream,
new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined));
const publication = await conference.publish(
localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}});
console.log(publication);
updateConferenceStatus('Created send channel.');
}
Expand All @@ -79,6 +100,16 @@ async function writeUuid() {
return;
}

async function writeVideoData() {
mediaStream = await navigator.mediaDevices.getUserMedia({video: true});
const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]);
mediaWorker = new Worker('./scripts/media-worker.js');
mediaWorker.postMessage(['video-source', track.readable], [track.readable]);
mediaWorker.postMessage(
['send-stream', bidirectionalStream.writable],
[bidirectionalStream.writable]);
}

async function writeData() {
const encoder = new TextEncoder();
const encoded = encoder.encode('message', {stream: true});
Expand All @@ -98,8 +129,8 @@ document.getElementById('start-sending').addEventListener('click', async () => {
updateConferenceStatus('Stream is not created.');
return;
}
await writeUuid();
writeTask = setInterval(writeData, 2000);
writeVideoData();
// writeTask = setInterval(writeData, 2000);
updateConferenceStatus('Started sending.');
});

Expand Down
20 changes: 16 additions & 4 deletions src/sdk/base/publication.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,29 @@ export class PublishOptions {
// eslint-disable-next-line require-jsdoc
constructor(audio, video, transport) {
/**
* @member {?Array<Owt.Base.AudioEncodingParameters> | ?Array<RTCRtpEncodingParameters>} audio
* @member {?Array<Owt.Base.AudioEncodingParameters> |
* ?Array<RTCRtpEncodingParameters> | ?AudioEncoderConfig } audio
* @instance
* @memberof Owt.Base.PublishOptions
* @desc Parameters for audio RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change.
* @desc Parameters for audio RtpSender when transport's type is 'webrtc' or
* configuration of audio encoder when transport's type is 'quic'.
* Publishing with RTCRtpEncodingParameters is an experimental feature. It
* is subject to change.
* @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters}
* @see {@link https://w3c.github.io/webcodecs/#dictdef-audioencoderconfig|AudioEncoderConfig}
*/
this.audio = audio;
/**
* @member {?Array<Owt.Base.VideoEncodingParameters> | ?Array<RTCRtpEncodingParameters>} video
* @member {?Array<Owt.Base.VideoEncodingParameters> |
* ?Array<RTCRtpEncodingParameters> | ?VideoEncoderConfig } video
* @instance
* @memberof Owt.Base.PublishOptions
* @desc Parameters for video RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change.
* @desc Parameters for video RtpSender when transport's type is 'webrtc' or
* configuration of video encoder when transport's type is 'quic'.
* Publishing with RTCRtpEncodingParameters is an experimental feature. It
* is subject to change.
* @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters}
* @see {@link https://w3c.github.io/webcodecs/#dictdef-videoencoderconfig|VideoEncoderConfig}
*/
this.video = video;
/**
Expand Down
4 changes: 3 additions & 1 deletion src/sdk/base/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export class TransportConstraints {
* @member {Array.<Owt.Base.TransportType>} type
* @instance
* @memberof Owt.Base.TransportConstraints
* @desc Transport type for publication and subscription.
* @desc Transport type for publication and subscription. 'quic' is only
* supported in conference mode when WebTransport is supported by client and
* enabled at server side.
*/
this.type = type;
/**
Expand Down
13 changes: 8 additions & 5 deletions src/sdk/conference/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,16 @@ export const ConferenceClient = function(config, signalingImpl) {
* @instance
* @desc Publish a LocalStream to conference server. Other participants will be able to subscribe this stream when it is successfully published.
* @param {Owt.Base.LocalStream} stream The stream to be published.
* @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it.
* @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. If the type of transport is quic, PublishOptions.audio should be AudioEncoderConfig, and PublishOptions.video should be VideoEncoderConfig.
* @param {string[]} videoCodecs Video codec names for publishing. Valid values are 'VP8', 'VP9' and 'H264'. This parameter only valid when the second argument is PublishOptions and options.video is RTCRtpEncodingParameters. Publishing with RTCRtpEncodingParameters is an experimental feature. This parameter is subject to change.
* @return {Promise<Publication, Error>} Returned promise will be resolved with a newly created Publication once specific stream is successfully published, or rejected with a newly created Error if stream is invalid or options cannot be satisfied. Successfully published means PeerConnection is established and server is able to process media data.
*/
this.publish = function(stream, options, videoCodecs) {
if (!(stream instanceof StreamModule.LocalStream)) {
return Promise.reject(new ConferenceError('Invalid stream.'));
}
if (stream.source.data) {
return quicTransportChannel.publish(stream);
if (options?.transport?.type === 'quic') {
return quicTransportChannel.publish(stream, options, videoCodecs);
}
if (publishChannels.has(stream.mediaStream.id)) {
return Promise.reject(new ConferenceError(
Expand Down Expand Up @@ -501,13 +501,16 @@ export const ConferenceClient = function(config, signalingImpl) {
'Invalid source info. A remote stream is either a data stream or ' +
'a media stream.'));
}
}
if (options?.transport?.type === 'quic') {
if (quicTransportChannel) {
return quicTransportChannel.subscribe(stream);
return quicTransportChannel.subscribe(stream, options);
} else {
return Promise.reject(new TypeError('WebTransport is not supported.'));
}
} else {
return peerConnectionChannel.subscribe(stream, options);
}
return peerConnectionChannel.subscribe(stream, options);
};

/**
Expand Down
53 changes: 45 additions & 8 deletions src/sdk/conference/quicconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,23 @@ export class QuicConnection extends EventDispatcher {
return quicStream;
}

async publish(stream) {
async publish(stream, options) {
// TODO: Avoid a stream to be published twice. The first 16 bit data send to
// server must be it's publication ID.
// TODO: Potential failure because of publication stream is created faster
// than signaling stream(created by the 1st call to initiatePublication).
const publicationId = await this._initiatePublication();
const publicationId = await this._initiatePublication(stream, options);
const quicStream = stream.stream;
const writer = quicStream.writable.getWriter();
await writer.ready;
writer.write(this._uuidToUint8Array(publicationId));
writer.releaseLock();
Logger.info('publish id');
this._quicStreams.set(publicationId, quicStream);
const publication = new Publication(publicationId, () => {
this._signaling.sendSignalingMessage('unpublish', {id: publication})
.catch((e) => {
Logger.warning('MCU returns negative ack for unpublishing, ' + e);
Logger.warning(
'Server returns negative ack for unpublishing, ' + e);
});
} /* TODO: getStats, mute, unmute is not implemented */);
return publication;
Expand Down Expand Up @@ -196,7 +196,7 @@ export class QuicConnection extends EventDispatcher {
return s;
}

subscribe(stream) {
subscribe(stream, options) {
const p = new Promise((resolve, reject) => {
this._signaling
.sendSignalingMessage('subscribe', {
Expand Down Expand Up @@ -231,10 +231,47 @@ export class QuicConnection extends EventDispatcher {
});
}

async _initiatePublication() {
async _initiatePublication(stream, options) {
const media = {tracks: []};
if (stream.source.audio) {
if (!options.audio) {
throw new TypeError(
'Options for audio is missing. Publish audio track with ' +
'WebTransport must have AudioEncoderConfig specified.');
}
const track = {
from: stream.id,
source: stream.source.audio,
type: 'audio',
format: {
codec: options.audio.codec,
sampleRate: options.audio.sampleRate,
channelNum: options.audio.numberOfChannels,
},
};
media.tracks.push(track);
}
if (stream.source.video) {
if (!options.video) {
throw new TypeError(
'Options for audio is missing. Publish video track with ' +
'WebTransport must have VideoEncoderConfig specified.');
}
const track = {
from: stream.id,
source: stream.source.video,
type: 'video',
// TODO: convert from MIME type to the format required by server.
format: {
codec: 'h264',
profile: 'B',
},
};
media.tracks.push(track);
}
const data = await this._signaling.sendSignalingMessage('publish', {
media: null,
data: true,
media: stream.source.data ? null : media,
data: stream.source.data,
transport: {type: 'quic', id: this._transportId},
});
if (this._transportId !== data.transportId) {
Expand Down

0 comments on commit 19b3675

Please sign in to comment.