Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Added http client for conference
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkonst committed Aug 25, 2023
1 parent 471ee23 commit 50b575a
Show file tree
Hide file tree
Showing 14 changed files with 6,060 additions and 1,377 deletions.
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v16.15
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ $ wrtc-agent --help
Options:
--version Show version number [boolean]
-c, --client-id Client id for mqtt-client [string] [required]
-e, --endpoint HTTP API endpoint for Conference [string] [required]
-n, --name Conference app name [string] [required]
-P, --password Password for mqtt-client [string] [required]
-r, --room-id Conference room id [string] [required]
Expand All @@ -45,7 +46,8 @@ Options:
#!/usr/bin/env bash

ACCESS_TOKEN=foobar
BROKER_URI=ws://example.org/
BROKER_URI=wss://example.org/
CONFERENCE_API_ENDPOINT=https://example.org/api
CONFERENCE_APP_NAME=conference.example.org
CONFERENCE_ROOM_ID=ea3f9fd1-3356-43b4-b709-b7cfc563ea59
STUN_URL=stun:stun.example.org:3478
Expand All @@ -56,6 +58,7 @@ TURN_PASSWORD=password

wrtc-agent \
-c web.john-doe.example.org \
-e ${CONFERENCE_API_ENDPOINT} \
-n ${CONFERENCE_APP_NAME} \
-P ${ACCESS_TOKEN} \
-r ${CONFERENCE_ROOM_ID} \
Expand Down
Binary file removed build/darwin/Release/wrtc.node
Binary file not shown.
Binary file removed build/linux/Release/wrtc.node
Binary file not shown.
93 changes: 68 additions & 25 deletions cli.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#!/usr/bin/env node

/* eslint-disable quote-props */
require('isomorphic-fetch')
require('regenerator-runtime/runtime')

const process = require('process')
const debounce = require('lodash/debounce')

const { argv } = require('yargs')
.scriptName('wrtc-agent')
.options({
Expand All @@ -10,6 +16,12 @@ const { argv } = require('yargs')
description: 'Client id for mqtt-client',
type: 'string'
},
'e': {
alias: 'endpoint',
demandOption: true,
description: 'HTTP API endpoint for Conference client',
type: 'string'
},
'n': {
alias: 'name',
demandOption: true,
Expand Down Expand Up @@ -78,27 +90,29 @@ const { argv } = require('yargs')
.help()
/* eslint-enable quote-props */

const { createPeerStatsMonitor, stats2metrics } = require('./lib/metrics')
const { createClient, enterRoom, publishTelemetry } = require('./lib/mqtt')
// const { createPeerStatsMonitor, stats2metrics } = require('./lib/metrics')
const { createClient } = require('./lib/mqtt')
const { Peer, transformOffer } = require('./lib/peer')

// args
const {
clientId,
endpoint,
name: appName,
password,
roomId,
relayOnly,
stun,
telemetry: telemetryAppName,
telemetryInterval,
// telemetry: telemetryAppName,
// telemetryInterval,
turn,
turnPassword,
turnUsername,
uri,
videoCodec
} = argv

const agentLabel = clientId.split('.')[0]
const iceServers = [
{ urls: stun },
{
Expand Down Expand Up @@ -156,34 +170,64 @@ function listRtcStreamAll (client, roomId) {
})
}

function startListening (client, mqttClient, activeRtcStream) {
function startListening (client, mqttClient, activeRtcStream, agentLabel) {
const activeRtcId = activeRtcStream.rtc_id
const listenerOptions = { offerToReceiveVideo: true, offerToReceiveAudio: true }

let handleId = null
let signalList = []

function sendSignals (errorCallback) {
if (!signalList.length) {
return
}

const signals = signalList.slice()

signalList = []

client.createRtcSignal(handleId, signals, undefined, agentLabel)
.catch(errorCallback)
}

const debouncedSendSignals = debounce(sendSignals.bind(null, (error) => {
console.log('[sendSignals] error', error)
}), 300)

peer = new Peer(
{ iceServers, iceTransportPolicy },
candidateObj => {
const { candidate, completed, sdpMid, sdpMLineIndex } = candidateObj

client.createRtcSignal(handleId, completed ? candidateObj : { candidate, sdpMid, sdpMLineIndex })
.catch(error => console.debug('[startListening] error', error))
signalList.push(completed ? candidateObj : { candidate, sdpMid, sdpMLineIndex })

debouncedSendSignals()
},
(track, streams) => {
(track) => {
console.debug('[track]', track.kind)
}
)

if (telemetryAppName) {
peerStats = createPeerStatsMonitor(peer._peer, telemetryInterval, (stats) => {
const payload = stats2metrics(stats)

publishTelemetry(mqttClient, clientId, telemetryAppName, payload)
})
}

client.connectRtc(activeRtcId)
// if (telemetryAppName) {
// peerStats = createPeerStatsMonitor(peer._peer, 1000, (stats) => {
// const payload = stats2metrics(stats)
// const data = {}
//
// payload.forEach(metric => data[metric.metric] = metric.value)
//
// process.send({ agentLabel, state: 'active', metrics: data })
//
// // console.log(`===[${agentLabel}]===`)
// // payload.forEach(metric => console.log(`${metric.metric}: \t\t\t${metric.value}`))
// // console.log('[stats]', payload)
//
// // metricsTable[agentLabel] = data
//
// // console.table(metricsTable)
// })
// }

client.connectRtc(activeRtcId, agentLabel)
.then((response) => {
handleId = response.handle_id

Expand All @@ -192,7 +236,7 @@ function startListening (client, mqttClient, activeRtcStream) {
.then(offer => {
const newOffer = transformOffer(offer, { videoCodec })

return client.createRtcSignal(handleId, newOffer)
return client.createRtcSignal(handleId, newOffer, undefined, agentLabel)
.then((response) => ({ response, offer: newOffer }))
})
.then(({ response, offer }) => {
Expand All @@ -214,8 +258,8 @@ function stopListening () {
}
}

createClient({ appName, clientId, password, uri })
.then(({ conferenceClient, mqttClient }) => {
createClient({ agentLabel, appName, clientId, endpoint, password, uri })
.then(({ conferenceClient, httpConferenceClient, mqttClient }) => {
function isStreamActive (stream) {
const { time } = stream

Expand All @@ -232,7 +276,7 @@ createClient({ appName, clientId, password, uri })
if (!activeRtcStream && isStreamActive(stream)) {
activeRtcStream = stream

startListening(conferenceClient, mqttClient, activeRtcStream)
startListening(httpConferenceClient, mqttClient, activeRtcStream, agentLabel)
} else if (activeRtcStream && stream && activeRtcStream.id === stream.id && isStreamEnded(stream)) {
activeRtcStream = null

Expand All @@ -255,18 +299,17 @@ createClient({ appName, clientId, password, uri })
handleStream(event.data)
})

enterRoom(conferenceClient, roomId, clientId)
httpConferenceClient.enterRoom(roomId, agentLabel)
.then(() => {
console.log('[READY]')

listRtcStreamAll(conferenceClient, roomId)
listRtcStreamAll(httpConferenceClient, roomId)
.then((response) => {
console.log('[listRtcStream] response', response)

if (response.length > 0 && isStreamActive(response[0])) {
handleStream(response[0])
}
})
.catch(error => console.log('[listRtcStreamAll] error', error))
})
.catch(error => console.log('[enterRoom] error', error))
})
137 changes: 69 additions & 68 deletions lib/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ function stats2metrics (stats) {
// 'packetRate'
// ]
const remoteAudioMetricList = [
'packetsReceived',
'bytesReceived',
'headerBytesReceived',
// 'packetsReceived',
// 'bytesReceived',
// 'headerBytesReceived',
'packetsLost',
'jitter',
'jitterBufferDelay',
'jitterBufferEmittedCount',
'audioLevel',
'totalAudioEnergy',
'totalSamplesReceived',
'totalSamplesDuration',
// 'jitter',
// 'jitterBufferDelay',
// 'jitterBufferEmittedCount',
// 'audioLevel',
// 'totalAudioEnergy',
// 'totalSamplesReceived',
// 'totalSamplesDuration',
'bitrate',
'packetRate'
// 'packetRate'
]
// const localVideoMetricList = [
// 'firCount',
Expand All @@ -53,49 +53,50 @@ function stats2metrics (stats) {
// 'packetRate'
// ]
const remoteVideoMetricList = [
'firCount',
'pliCount',
'nackCount',
'qpSum',
'packetsReceived',
'bytesReceived',
'headerBytesReceived',
// 'firCount',
// 'pliCount',
// 'nackCount',
// 'qpSum',
// 'packetsReceived',
// 'bytesReceived',
// 'headerBytesReceived',
'packetsLost',
'framesDecoded',
'keyFramesDecoded',
'totalDecodeTime',
'totalInterFrameDelay',
'totalSquaredInterFrameDelay',
'jitterBufferDelay',
'jitterBufferEmittedCount',
'frameWidth',
'frameHeight',
'framesReceived',
'framesDropped',
// 'framesDecoded',
// 'keyFramesDecoded',
// 'totalDecodeTime',
// 'totalInterFrameDelay',
// 'totalSquaredInterFrameDelay',
// 'jitterBufferDelay',
// 'jitterBufferEmittedCount',
// 'frameWidth',
// 'frameHeight',
// 'framesReceived',
// 'framesDropped',
'bitrate',
'packetRate'
]
const candidateLocalMetricList = [
'candidateType',
'networkType',
'protocol',
]
const candidateRemoteMetricList = [
'candidateType',
'protocol',
]
const connectionMetricList = [
'bytesSent',
'bytesReceived',
'totalRoundTripTime',
'currentRoundTripTime',
'availableOutgoingBitrate'
// 'packetRate'
]
// const candidateLocalMetricList = [
// 'candidateType',
// 'networkType',
// 'protocol',
// ]
// const candidateRemoteMetricList = [
// 'candidateType',
// 'protocol',
// ]
// const connectionMetricList = [
// // 'bytesSent',
// // 'bytesReceived',
// 'totalRoundTripTime',
// 'currentRoundTripTime',
// 'availableOutgoingBitrate'
// ]
const tags = {
peer_id: stats.peerId,
}

const metricBaseName = 'apps.wrtc-agent.pc'
// const metricBaseName = 'apps.wrtc-agent.pc'
const metricBaseName = ''
// const localAudioMetricName = `${metricBaseName}.audio.local`
// const localAudioMetrics = localAudioMetricList.map(_ => ({
// metric: `${localAudioMetricName}.${snakeCase(_)}`,
Expand All @@ -118,33 +119,33 @@ function stats2metrics (stats) {
value: stats.data.video.remote[_] || 0,
tags
}))
const connectionMetricName = `${metricBaseName}.connection`
const connectionMetrics = connectionMetricList.map(_ => ({
metric: `${connectionMetricName}.${snakeCase(_)}`,
value: stats.data.connection[_] || 0,
tags
}))
const candidateLocalMetricName = `${metricBaseName}.connection.local`
const candidateLocalMetrics = candidateLocalMetricList.map(_ => ({
metric: `${candidateLocalMetricName}.${snakeCase(_)}`,
value: stats.data.connection.local[_] || 0,
tags,
}))
const candidateRemoteMetricName = `${metricBaseName}.connection.remote`
const candidateRemoteMetrics = candidateRemoteMetricList.map(_ => ({
metric: `${candidateRemoteMetricName}.${snakeCase(_)}`,
value: stats.data.connection.remote[_] || 0,
tags,
}))
// const connectionMetricName = `${metricBaseName}.connection`
// const connectionMetrics = connectionMetricList.map(_ => ({
// metric: `${connectionMetricName}.${snakeCase(_)}`,
// value: stats.data.connection[_] || 0,
// tags
// }))
// const candidateLocalMetricName = `${metricBaseName}.connection.local`
// const candidateLocalMetrics = candidateLocalMetricList.map(_ => ({
// metric: `${candidateLocalMetricName}.${snakeCase(_)}`,
// value: stats.data.connection.local[_] || 0,
// tags,
// }))
// const candidateRemoteMetricName = `${metricBaseName}.connection.remote`
// const candidateRemoteMetrics = candidateRemoteMetricList.map(_ => ({
// metric: `${candidateRemoteMetricName}.${snakeCase(_)}`,
// value: stats.data.connection.remote[_] || 0,
// tags,
// }))

return [
// ...localAudioMetrics,
...remoteAudioMetrics,
// ...localVideoMetrics,
...remoteVideoMetrics,
...connectionMetrics,
...candidateLocalMetrics,
...candidateRemoteMetrics
// ...connectionMetrics,
// ...candidateLocalMetrics,
// ...candidateRemoteMetrics
]
}

Expand Down
Loading

0 comments on commit 50b575a

Please sign in to comment.