Skip to content

Commit

Permalink
ULMS-2832 Added Broker client, deprecated Conference, Event clients
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkonst committed Mar 7, 2024
1 parent b2a5d68 commit 2585ef9
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 13 deletions.
31 changes: 29 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ulms/api-clients",
"version": "7.1.0",
"version": "7.1.0-dev.0-ulms-2832",
"description": "JavaScript API clients for ULMS platform",
"keywords": [],
"homepage": "https://github.com/foxford/ulms-api-clients-js#readme",
Expand Down Expand Up @@ -37,6 +37,7 @@
"debug": "4.3.4",
"events": "3.3.0",
"mime": "2.6.0",
"mqtt-pattern": "~1.2.0",
"nats.ws": "1.7.2",
"uuid": "8.3.2"
},
Expand Down
176 changes: 176 additions & 0 deletions src/broker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/* eslint-disable unicorn/prevent-abbreviations */
// eslint-disable-next-line unicorn/prefer-node-protocol
import EventEmitter from 'events'
import MQTTPattern from 'mqtt-pattern'

import Codec from './codec'

/**
* Agent reader configuration
* @name AgentReaderConfig
* @type {object}
* @property {string} agent_id
* @property {boolean} receive_audio
* @property {boolean} receive_video
*/

/**
* Agent writer configuration
* @name AgentWriterConfig
* @type {object}
* @property {string} agent_id
* @property {boolean} send_audio
* @property {boolean} send_video
* @property {number} video_remb
*/

const entityEventsEnum = {
AGENT_UPDATE: 'agent.update',
AGENT_WRITER_CONFIG_UPDATE: 'agent_writer_config.update',
CONFERENCE_ROOM_CLOSE: 'conference_room.close',
CONFERENCE_ROOM_ENTER: 'conference_room.enter',
CONFERENCE_ROOM_LEAVE: 'conference_room.leave',
EVENT_CREATE: 'event.create',
EVENT_ROOM_ENTER: 'event_room.enter',
EVENT_ROOM_LEAVE: 'event_room.leave',
EVENT_ROOM_UPDATE: 'event_room.update',
GROUP_UPDATE: 'video_group.update',
RTC_STREAM_AGENT_SPEAKING: 'rtc_stream.agent_speaking',
RTC_STREAM_UPDATE: 'rtc_stream.update',
}

const ROOM_CLOSE_EVENT = 'room.close'
const ROOM_ENTER_EVENT = 'room.enter'
const ROOM_LEAVE_EVENT = 'room.leave'
const ROOM_UPDATE_EVENT = 'room.update'

const appNameToEventNameMap = {
'conference.svc.netology-group.services': {
[ROOM_CLOSE_EVENT]: entityEventsEnum.CONFERENCE_ROOM_CLOSE,
[ROOM_ENTER_EVENT]: entityEventsEnum.CONFERENCE_ROOM_ENTER,
[ROOM_LEAVE_EVENT]: entityEventsEnum.CONFERENCE_ROOM_LEAVE,
},
'event.svc.netology-group.services': {
[ROOM_ENTER_EVENT]: entityEventsEnum.EVENT_ROOM_ENTER,
[ROOM_LEAVE_EVENT]: entityEventsEnum.EVENT_ROOM_LEAVE,
[ROOM_UPDATE_EVENT]: entityEventsEnum.EVENT_ROOM_UPDATE,
},
}
const eventNamesToTransform = [
ROOM_CLOSE_EVENT,
ROOM_ENTER_EVENT,
ROOM_LEAVE_EVENT,
ROOM_UPDATE_EVENT,
]

class Broker {
/**
* Entity events enum
* @returns {{
* AGENT_UPDATE: string,
* AGENT_WRITER_CONFIG_UPDATE: string,
* CONFERENCE_ROOM_CLOSE: string,
* CONFERENCE_ROOM_ENTER: string,
* CONFERENCE_ROOM_LEAVE: string,
* EVENT_CREATE: string,
* EVENT_ROOM_ENTER: string,
* EVENT_ROOM_LEAVE: string,
* EVENT_ROOM_UPDATE: string,
* GROUP_UPDATE: string,
* RTC_STREAM_AGENT_SPEAKING: string,
* RTC_STREAM_UPDATE: string,
* }}
*/
static get events() {
return entityEventsEnum
}

constructor(mqttClient) {
this.mqtt = mqttClient

this.codec = new Codec(
(data) => JSON.stringify(data),
(data) => {
let payload

try {
payload = JSON.parse(data.toString())
} catch {
payload = {}
}

return payload
}
)
this.ee = new EventEmitter()

this.bindListeners()
}

bindListeners() {
console.log('[bindListeners] on', this.mqtt.prototype.events.MESSAGE)

Check warning on line 111 in src/broker.js

View workflow job for this annotation

GitHub Actions / check

Unexpected console statement
this.mqtt.on(
this.mqtt.prototype.events.MESSAGE,
this.messageHandler.bind(this)
)
}

messageHandler(topic, message, packet) {
const payload = this.codec.decode(message)
const {
properties: {
userProperties: { label, type },
},
} = packet

if (type === 'event' && payload !== undefined) {
if (eventNamesToTransform.contains(label)) {
const topicParams = MQTTPattern.exec(
`apps/+appName/api/v1/rooms/+roomId/events`,
topic
)

console.log('[label] old event', label)

Check warning on line 133 in src/broker.js

View workflow job for this annotation

GitHub Actions / check

Unexpected console statement
console.log('[topicParams]', topicParams)

Check warning on line 134 in src/broker.js

View workflow job for this annotation

GitHub Actions / check

Unexpected console statement

if (topicParams !== null) {
const { appName } = topicParams
const transformedLabel = appNameToEventNameMap[appName][label]

const event = {
type: transformedLabel,
data: payload,
}

this.ee.emit(transformedLabel, event)
} else {
console.warn('[topicParams] no parameters')

Check warning on line 147 in src/broker.js

View workflow job for this annotation

GitHub Actions / check

Unexpected console statement
}
} else {
const event = {
type: label,
data: payload,
}

this.ee.emit(label, event)
}
} else {
// do nothing
console.log('[messageHandler] ignore message', type)

Check warning on line 159 in src/broker.js

View workflow job for this annotation

GitHub Actions / check

Unexpected console statement
}
}

on(eventName, eventHandler) {
this.ee.addListener(eventName, eventHandler)
}

off(eventName, eventHandler) {
this.ee.removeListener(eventName, eventHandler)
}

destroy() {
this.ee.removeAllListeners()
}
}

export default Broker
8 changes: 4 additions & 4 deletions src/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export const sleep = async (ms) =>
export async function enterServiceRoom(
client,
httpClient,
eventName,
roomId,
id,
label,
Expand All @@ -67,7 +68,6 @@ export async function enterServiceRoom(
serviceName
) {
const backoff = new Backoff()
const EVENT_NAME = 'room.enter'
const isTransportConnected = () => client.mqtt.connected
let enterRoomSuccess = false
let response
Expand All @@ -76,11 +76,11 @@ export async function enterServiceRoom(
if (event.data.agent_id === id) {
enterRoomSuccess = true

client.off(EVENT_NAME, handler)
client.off(eventName, handler)
}
}

client.on(EVENT_NAME, handler)
client.on(eventName, handler)

try {
// eslint-disable-next-line no-constant-condition
Expand Down Expand Up @@ -108,7 +108,7 @@ export async function enterServiceRoom(
trackEvent('Debug', `${serviceName}.Subscription.Retry`)
}
} catch (error) {
client.off(EVENT_NAME, handler)
client.off(eventName, handler)

backoff.reset()

Expand Down
7 changes: 4 additions & 3 deletions src/conference.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import Service from './service'
* @property {number} video_remb
*/

/**
* @deprecated Use Broker class instead of Conference class
*/
class Conference extends Service {
/**
* Conference events enum
Expand All @@ -28,19 +31,17 @@ class Conference extends Service {
* ROOM_CLOSE: string,
* ROOM_ENTER: string,
* ROOM_LEAVE: string,
* ROOM_OPEN: string,
* RTC_STREAM_AGENT_SPEAKING: string
* RTC_STREAM_UPDATE: string
* }}
*/
static get events() {
return {
AGENT_WRITER_CONFIG_UPDATE: 'agent_writer_config.update',
GROUP_UPDATE: 'group.update',
GROUP_UPDATE: 'video_group.update',
ROOM_CLOSE: 'room.close',
ROOM_ENTER: 'room.enter',
ROOM_LEAVE: 'room.leave',
ROOM_OPEN: 'room.open',
RTC_STREAM_AGENT_SPEAKING: 'rtc_stream.agent_speaking',
RTC_STREAM_UPDATE: 'rtc_stream.update',
}
Expand Down
14 changes: 11 additions & 3 deletions src/event.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import Service from './service'

/**
* @deprecated Use Broker class instead of Event class
*/
class Event extends Service {
/**
* Change type enum
Expand All @@ -15,13 +18,18 @@ class Event extends Service {

/**
* Events enum
* @returns {{AGENT_UPDATE: string, EVENT_CREATE: string, ROOM_CLOSE: string, ROOM_ENTER: string, ROOM_LEAVE: string, ROOM_UPDATE: string}}
* @returns {{
* AGENT_UPDATE: string,
* EVENT_CREATE: string,
* ROOM_ENTER: string,
* ROOM_LEAVE: string,
* ROOM_UPDATE: string
* }}
*/
static get events() {
return {
AGENT_UPDATE: 'agent.update',
EVENT_CREATE: 'event.create', // eslint-disable-line sonarjs/no-duplicate-string
ROOM_CLOSE: 'room.close',
EVENT_CREATE: 'event.create',
ROOM_ENTER: 'room.enter',
ROOM_LEAVE: 'room.leave',
ROOM_UPDATE: 'room.update',
Expand Down
12 changes: 12 additions & 0 deletions src/http-event.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ const eventEndpoints = {
}

class HTTPEvent extends BasicClient {
/**
* Change type enum
* @returns {{ADDITION: string, MODIFICATION: string, REMOVAL: string}}
*/
static get changeTypes() {
return {
ADDITION: 'addition',
MODIFICATION: 'modification',
REMOVAL: 'removal',
}
}

/**
* Read room
* @param id
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export { default as Backoff } from './backoff'
export { default as Broker } from './broker'
export { default as Conference } from './conference'
export { default as Dispatcher } from './dispatcher'
export { default as Event } from './event'
Expand Down
3 changes: 3 additions & 0 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import EventEmitter from 'events'

import Codec from './codec'

/**
* @deprecated Use Broker class instead of Service base class
*/
class Service {
constructor(mqttClient, agentId, appName) {
this.agentId = agentId
Expand Down

0 comments on commit 2585ef9

Please sign in to comment.