From abc03c89929aa00bf2528185595d6ecd6bbfd6d0 Mon Sep 17 00:00:00 2001 From: Aleksey Konstantinov Date: Mon, 11 Mar 2024 18:19:50 +0300 Subject: [PATCH] ULMS-2832 Added Broker client, deprecated Conference, Event clients --- package-lock.json | 31 +++++++- package.json | 3 +- src/broker.js | 181 ++++++++++++++++++++++++++++++++++++++++++++++ src/common.js | 16 ++-- src/conference.js | 7 +- src/event.js | 14 +++- src/http-event.js | 12 +++ src/index.js | 1 + src/service.js | 3 + 9 files changed, 254 insertions(+), 14 deletions(-) create mode 100644 src/broker.js diff --git a/package-lock.json b/package-lock.json index 81a1028..3d4637c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,18 +1,19 @@ { "name": "@ulms/api-clients", - "version": "7.1.0", + "version": "7.1.0-dev.4-ulms-2832", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@ulms/api-clients", - "version": "7.1.0", + "version": "7.1.0-dev.4-ulms-2832", "license": "MIT", "dependencies": { "axios": "1.6.2", "debug": "4.3.4", "events": "3.3.0", "mime": "2.6.0", + "mqtt-pattern": "~1.2.0", "nats.ws": "1.7.2", "uuid": "8.3.2" }, @@ -8660,6 +8661,19 @@ "node": ">=0.10.0" } }, + "node_modules/mqtt-match": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/mqtt-match/-/mqtt-match-1.0.3.tgz", + "integrity": "sha512-nfeAp+chyjVeIvvrgMhQCfDAIVp/zXX8rtxHQwuAWuapqAdFs1F0kIekG445ps3xs/qFPK6l2xRlAyiqqwbmrQ==" + }, + "node_modules/mqtt-pattern": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/mqtt-pattern/-/mqtt-pattern-1.2.0.tgz", + "integrity": "sha512-5tvJTrMXcvAWRc4J+YW0pnZOLi20yXFX1R7cjB0291X9aRFPyO+5vbh/kHVwUlNPfJuwijcvrd/AmwDLe1021w==", + "dependencies": { + "mqtt-match": "^1.0.2" + } + }, "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", @@ -18277,6 +18291,19 @@ "is-extendable": "^1.0.1" } }, + "mqtt-match": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/mqtt-match/-/mqtt-match-1.0.3.tgz", + "integrity": "sha512-nfeAp+chyjVeIvvrgMhQCfDAIVp/zXX8rtxHQwuAWuapqAdFs1F0kIekG445ps3xs/qFPK6l2xRlAyiqqwbmrQ==" + }, + "mqtt-pattern": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/mqtt-pattern/-/mqtt-pattern-1.2.0.tgz", + "integrity": "sha512-5tvJTrMXcvAWRc4J+YW0pnZOLi20yXFX1R7cjB0291X9aRFPyO+5vbh/kHVwUlNPfJuwijcvrd/AmwDLe1021w==", + "requires": { + "mqtt-match": "^1.0.2" + } + }, "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", diff --git a/package.json b/package.json index 839f6ed..28e5b03 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ulms/api-clients", - "version": "7.1.0", + "version": "7.1.0-dev.4-ulms-2832", "description": "JavaScript API clients for ULMS platform", "keywords": [], "homepage": "https://github.com/foxford/ulms-api-clients-js#readme", @@ -37,6 +37,7 @@ "debug": "4.3.4", "events": "3.3.0", "mime": "2.6.0", + "mqtt-pattern": "~1.2.0", "nats.ws": "1.7.2", "uuid": "8.3.2" }, diff --git a/src/broker.js b/src/broker.js new file mode 100644 index 0000000..31eccd0 --- /dev/null +++ b/src/broker.js @@ -0,0 +1,181 @@ +/* eslint-disable unicorn/prevent-abbreviations */ +// eslint-disable-next-line unicorn/prefer-node-protocol +import EventEmitter from 'events' +import MQTTPattern from 'mqtt-pattern' + +import Codec from './codec' + +/** + * Agent reader configuration + * @name AgentReaderConfig + * @type {object} + * @property {string} agent_id + * @property {boolean} receive_audio + * @property {boolean} receive_video + */ + +/** + * Agent writer configuration + * @name AgentWriterConfig + * @type {object} + * @property {string} agent_id + * @property {boolean} send_audio + * @property {boolean} send_video + * @property {number} video_remb + */ + +const entityEventsEnum = { + AGENT_UPDATE: 'agent.update', + AGENT_WRITER_CONFIG_UPDATE: 'agent_writer_config.update', + CONFERENCE_ROOM_CLOSE: 'conference_room.close', + CONFERENCE_ROOM_ENTER: 'conference_room.enter', + CONFERENCE_ROOM_LEAVE: 'conference_room.leave', + EVENT_CREATE: 'event.create', + EVENT_ROOM_ENTER: 'event_room.enter', + EVENT_ROOM_LEAVE: 'event_room.leave', + EVENT_ROOM_UPDATE: 'event_room.update', + GROUP_UPDATE: 'video_group.update', + RTC_STREAM_AGENT_SPEAKING: 'rtc_stream.agent_speaking', + RTC_STREAM_UPDATE: 'rtc_stream.update', +} + +const ROOM_CLOSE_EVENT = 'room.close' +const ROOM_ENTER_EVENT = 'room.enter' +const ROOM_LEAVE_EVENT = 'room.leave' +const ROOM_UPDATE_EVENT = 'room.update' + +const appNameToEventNameMap = { + 'conference.svc.netology-group.services': { + [ROOM_CLOSE_EVENT]: entityEventsEnum.CONFERENCE_ROOM_CLOSE, + [ROOM_ENTER_EVENT]: entityEventsEnum.CONFERENCE_ROOM_ENTER, + [ROOM_LEAVE_EVENT]: entityEventsEnum.CONFERENCE_ROOM_LEAVE, + }, + 'event.svc.netology-group.services': { + [ROOM_ENTER_EVENT]: entityEventsEnum.EVENT_ROOM_ENTER, + [ROOM_LEAVE_EVENT]: entityEventsEnum.EVENT_ROOM_LEAVE, + [ROOM_UPDATE_EVENT]: entityEventsEnum.EVENT_ROOM_UPDATE, + }, +} +const eventNamesToTransform = new Set([ + ROOM_CLOSE_EVENT, + ROOM_ENTER_EVENT, + ROOM_LEAVE_EVENT, + ROOM_UPDATE_EVENT, +]) + +console.log('[Broker] eventNamesToTransform', eventNamesToTransform, [ + ...eventNamesToTransform, +]) + +class Broker { + /** + * Entity events enum + * @returns {{ + * AGENT_UPDATE: string, + * AGENT_WRITER_CONFIG_UPDATE: string, + * CONFERENCE_ROOM_CLOSE: string, + * CONFERENCE_ROOM_ENTER: string, + * CONFERENCE_ROOM_LEAVE: string, + * EVENT_CREATE: string, + * EVENT_ROOM_ENTER: string, + * EVENT_ROOM_LEAVE: string, + * EVENT_ROOM_UPDATE: string, + * GROUP_UPDATE: string, + * RTC_STREAM_AGENT_SPEAKING: string, + * RTC_STREAM_UPDATE: string, + * }} + */ + static get events() { + return entityEventsEnum + } + + constructor(mqttClient) { + this.mqtt = mqttClient + + this.codec = new Codec( + (data) => JSON.stringify(data), + (data) => { + let payload + + try { + payload = JSON.parse(data.toString()) + } catch { + payload = {} + } + + return payload + } + ) + this.ee = new EventEmitter() + + this.bindListeners() + } + + bindListeners() { + this.mqtt.on( + this.mqtt.constructor.events.MESSAGE, + this.messageHandler.bind(this) + ) + } + + messageHandler(topic, message, packet) { + console.log('[Broker:messageHandler]', topic, message) + const payload = this.codec.decode(message) + const { + properties: { + userProperties: { label, type }, + }, + } = packet + + if (type === 'event' && payload !== undefined) { + console.log('[Broker:messageHandler] type', type, payload) + if (eventNamesToTransform.has(label)) { + console.log('[Broker:messageHandler] has label, perform transform') + const topicParams = MQTTPattern.exec( + `apps/+appName/api/v1/rooms/+roomId/events`, + topic + ) + + console.log('[Broker:messageHandler] topicParams', topicParams) + + if (topicParams !== null) { + const { appName } = topicParams + const transformedLabel = appNameToEventNameMap[appName][label] + + const event = { + type: transformedLabel, + data: payload, + } + + this.ee.emit(transformedLabel, event) + } else { + console.warn('[topicParams] no parameters') // eslint-disable-line no-console + } + } else { + const event = { + type: label, + data: payload, + } + + this.ee.emit(label, event) + } + } else { + // do nothing + console.log('[messageHandler] ignore message', type) // eslint-disable-line no-console + } + } + + on(eventName, eventHandler) { + this.ee.addListener(eventName, eventHandler) + } + + off(eventName, eventHandler) { + this.ee.removeListener(eventName, eventHandler) + } + + destroy() { + this.ee.removeAllListeners() + } +} + +export default Broker diff --git a/src/common.js b/src/common.js index cf96ea1..6f15bef 100644 --- a/src/common.js +++ b/src/common.js @@ -59,28 +59,34 @@ export const sleep = async (ms) => export async function enterServiceRoom( client, httpClient, + eventName, roomId, id, label, - minDelay, // unused parameter trackEvent, serviceName ) { + console.log('[enterServiceRoom] call', eventName, id, label, serviceName) + const backoff = new Backoff() - const EVENT_NAME = 'room.enter' const isTransportConnected = () => client.mqtt.connected let enterRoomSuccess = false let response const handler = (event) => { + console.log('[enterServiceRoom] handler call', event) if (event.data.agent_id === id) { + console.log('[enterServiceRoom] enterRoomSuccess') + enterRoomSuccess = true - client.off(EVENT_NAME, handler) + client.off(eventName, handler) + } else { + console.log('[enterServiceRoom] no agent match') } } - client.on(EVENT_NAME, handler) + client.on(eventName, handler) try { // eslint-disable-next-line no-constant-condition @@ -108,7 +114,7 @@ export async function enterServiceRoom( trackEvent('Debug', `${serviceName}.Subscription.Retry`) } } catch (error) { - client.off(EVENT_NAME, handler) + client.off(eventName, handler) backoff.reset() diff --git a/src/conference.js b/src/conference.js index 7a79cc2..c14dc6e 100644 --- a/src/conference.js +++ b/src/conference.js @@ -19,6 +19,9 @@ import Service from './service' * @property {number} video_remb */ +/** + * @deprecated Use Broker class instead of Conference class + */ class Conference extends Service { /** * Conference events enum @@ -28,7 +31,6 @@ class Conference extends Service { * ROOM_CLOSE: string, * ROOM_ENTER: string, * ROOM_LEAVE: string, - * ROOM_OPEN: string, * RTC_STREAM_AGENT_SPEAKING: string * RTC_STREAM_UPDATE: string * }} @@ -36,11 +38,10 @@ class Conference extends Service { static get events() { return { AGENT_WRITER_CONFIG_UPDATE: 'agent_writer_config.update', - GROUP_UPDATE: 'group.update', + GROUP_UPDATE: 'video_group.update', ROOM_CLOSE: 'room.close', ROOM_ENTER: 'room.enter', ROOM_LEAVE: 'room.leave', - ROOM_OPEN: 'room.open', RTC_STREAM_AGENT_SPEAKING: 'rtc_stream.agent_speaking', RTC_STREAM_UPDATE: 'rtc_stream.update', } diff --git a/src/event.js b/src/event.js index 4765200..f42d92b 100644 --- a/src/event.js +++ b/src/event.js @@ -1,5 +1,8 @@ import Service from './service' +/** + * @deprecated Use Broker class instead of Event class + */ class Event extends Service { /** * Change type enum @@ -15,13 +18,18 @@ class Event extends Service { /** * Events enum - * @returns {{AGENT_UPDATE: string, EVENT_CREATE: string, ROOM_CLOSE: string, ROOM_ENTER: string, ROOM_LEAVE: string, ROOM_UPDATE: string}} + * @returns {{ + * AGENT_UPDATE: string, + * EVENT_CREATE: string, + * ROOM_ENTER: string, + * ROOM_LEAVE: string, + * ROOM_UPDATE: string + * }} */ static get events() { return { AGENT_UPDATE: 'agent.update', - EVENT_CREATE: 'event.create', // eslint-disable-line sonarjs/no-duplicate-string - ROOM_CLOSE: 'room.close', + EVENT_CREATE: 'event.create', ROOM_ENTER: 'room.enter', ROOM_LEAVE: 'room.leave', ROOM_UPDATE: 'room.update', diff --git a/src/http-event.js b/src/http-event.js index b05cce3..8c2c5b5 100644 --- a/src/http-event.js +++ b/src/http-event.js @@ -23,6 +23,18 @@ const eventEndpoints = { } class HTTPEvent extends BasicClient { + /** + * Change type enum + * @returns {{ADDITION: string, MODIFICATION: string, REMOVAL: string}} + */ + static get changeTypes() { + return { + ADDITION: 'addition', + MODIFICATION: 'modification', + REMOVAL: 'removal', + } + } + /** * Read room * @param id diff --git a/src/index.js b/src/index.js index e007253..7159488 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,5 @@ export { default as Backoff } from './backoff' +export { default as Broker } from './broker' export { default as Conference } from './conference' export { default as Dispatcher } from './dispatcher' export { default as Event } from './event' diff --git a/src/service.js b/src/service.js index c57592a..b0f8f0c 100644 --- a/src/service.js +++ b/src/service.js @@ -4,6 +4,9 @@ import EventEmitter from 'events' import Codec from './codec' +/** + * @deprecated Use Broker class instead of Service base class + */ class Service { constructor(mqttClient, agentId, appName) { this.agentId = agentId