From c8fe3b1d5c5f625ceffd3d970efed1b525267165 Mon Sep 17 00:00:00 2001 From: Krzysztof Zegzula Date: Fri, 2 Feb 2024 11:42:11 +0100 Subject: [PATCH] fix(vMix): fragmented message handling SOFIE-2932 (#320) --- .../vMixResponseStreamReader.spec.ts | 488 ++++++++++++++++++ .../integrations/vmix/__tests__/vmixMock.ts | 2 +- .../src/integrations/vmix/connection.ts | 78 +-- .../src/integrations/vmix/index.ts | 3 +- .../vmix/vMixResponseStreamReader.ts | 100 ++++ 5 files changed, 597 insertions(+), 74 deletions(-) create mode 100644 packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts create mode 100644 packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts new file mode 100644 index 000000000..9e919c271 --- /dev/null +++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts @@ -0,0 +1,488 @@ +import { VMixResponseStreamReader } from '../vMixResponseStreamReader' + +describe('VMixResponseStreamReader', () => { + it('processes a complete message', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + }) + + it('processes two complete messages', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'OK', + message: 'Take', + }) + ) + }) + + it('processes a fragmented message #1', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION O')) + reader.processIncomingData(Buffer.from('K 27.0.0.49\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + }) + + it('processes a fragmented message #2', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + reader.processIncomingData(Buffer.from('FUNCTION')) + reader.processIncomingData(Buffer.from(' ER Error message\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'ER', + message: 'Error message', + }) + ) + }) + + it('processes a fragmented message #3', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49')) + reader.processIncomingData(Buffer.from('\r\n')) + reader.processIncomingData(Buffer.from('FUNCTION')) + reader.processIncomingData(Buffer.from(' ER Error message\r')) + reader.processIncomingData(Buffer.from('\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'ER', + message: 'Error message', + }) + ) + }) + + it('processes combined messages #1', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'ER', + message: 'Error message', + }) + ) + }) + + it('processes combined messages #2', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E')) + reader.processIncomingData(Buffer.from('R Error message\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'ER', + message: 'Error message', + }) + ) + }) + + it('processes combined messages #3', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E')) + reader.processIncomingData(Buffer.from('R Error message\r\nFUNCTION OK T')) + reader.processIncomingData(Buffer.from('ake\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(3) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'ER', + message: 'Error message', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 3, + expect.objectContaining({ + command: 'FUNCTION', + response: 'OK', + message: 'Take', + }) + ) + }) + + it('processes a message with data', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '27.0.0.49HDC:\\preset.vmix' + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + }) + + it('processes a multiline message with data', async () => { + // note: I don't know if those can actually be encountered + + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '\r\n27.0.0.49\r\nHD\r\nC:\\preset.vmix\r\n\r\n' + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + }) + + it('processes a fragmented message with data', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '27.0.0.49HDC:\\preset.vmix' + const xmlMessage = makeXmlMessage(xmlString) + splitAtIndices(xmlMessage, [2, 10, 25, 40]).forEach((fragment) => { + expect(fragment.length).toBeGreaterThan(0) + reader.processIncomingData(Buffer.from(fragment)) + }) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + }) + + it('processes combined messages with data', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '27.0.0.49HDC:\\preset.vmix' + const xmlString2 = '25.0.0.14K' + + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2))) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString2, + }) + ) + }) + + it('processes separate messages with data', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '27.0.0.49HDC:\\preset.vmix' + const xmlString2 = '25.0.0.14K' + + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2))) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString2, + }) + ) + }) + + it('can be reset during incomplete message', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '27.0.0.49HDC:\\preset.vmix' + const xmlString2 = '25.0.0.14K' + + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString).substring(0, 44))) + reader.reset() + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2))) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString2, + }) + ) + }) + + it('catches errors thrown in response handler', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn(() => { + throw Error('Something went wrong') + }) + reader.on('response', onMessage) + const onError = jest.fn() + reader.on('error', onError) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'OK', + message: 'Take', + }) + ) + + expect(onError).toHaveBeenCalledTimes(2) + }) + + it('rejects empty messages silently', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + const onError = jest.fn() + reader.on('error', onError) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + reader.processIncomingData(Buffer.from('\r\n')) + reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'OK', + message: 'Take', + }) + ) + + expect(onError).toHaveBeenCalledTimes(0) + }) + + it('rejects invalid messages and emits error', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + const onError = jest.fn() + reader.on('error', onError) + + reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + reader.processIncomingData(Buffer.from('WASSUP\r\n')) + reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + + expect(onMessage).toHaveBeenCalledTimes(2) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'VERSION', + response: 'OK', + }) + ) + expect(onMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + command: 'FUNCTION', + response: 'OK', + message: 'Take', + }) + ) + + expect(onError).toHaveBeenCalledTimes(1) + }) +}) + +function makeXmlMessage(xmlString: string): string { + return `XML ${xmlString.length + 2}\r\n${xmlString}\r\n` +} + +function splitAtIndices(text: string, indices: number[]) { + const result: string[] = [] + let lastIndex = 0 + + indices.forEach((index) => { + if (index > lastIndex && index < text.length) { + result.push(text.substring(lastIndex, index)) + lastIndex = index + } + }) + + if (lastIndex < text.length) { + result.push(text.substring(lastIndex)) + } + + return result +} diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts index 2240abb41..f298246f6 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts @@ -46,7 +46,7 @@ ${inputs?.join('\r\n') ?? defaultInputs}\r\n -` +\r\n` } export function setupVmixMock() { diff --git a/packages/timeline-state-resolver/src/integrations/vmix/connection.ts b/packages/timeline-state-resolver/src/integrations/vmix/connection.ts index 07b7d570a..4efae0d50 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/connection.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/connection.ts @@ -2,9 +2,9 @@ import { EventEmitter } from 'eventemitter3' import { Socket } from 'net' import { VMixCommand } from 'timeline-state-resolver-types' import { VMixStateCommand } from './vMixCommands' +import { Response, VMixResponseStreamReader } from './vMixResponseStreamReader' const VMIX_DEFAULT_TCP_PORT = 8099 -const RESPONSE_REGEX = /^(?\w+)\s+(?OK|ER|\d+)(\s+(?.*))?/i export enum ResponseTypes { Info = 'INFO', @@ -21,13 +21,6 @@ export type ConnectionEvents = { error: [error: Error] } -export interface Response { - command: string - response: 'OK' | 'ER' - message: string - body?: string -} - /** * This TSR integration polls the state of vMix and merges that into our last-known state. * However, not all state properties can be retried from vMix's API. @@ -38,13 +31,15 @@ export type InferredPartialInputStateKeys = 'filePath' | 'fade' | 'audioAuto' | export class BaseConnection extends EventEmitter { private _socket?: Socket - private _unprocessedLines: string[] = [] private _reconnectTimeout?: NodeJS.Timeout private _connected = false + private _responseStreamReader = new VMixResponseStreamReader() constructor(private host: string, private port = VMIX_DEFAULT_TCP_PORT, autoConnect = false) { super() if (autoConnect) this._setupSocket() + this._responseStreamReader.on('response', (response) => this.emit('data', response)) + this._responseStreamReader.on('error', (error) => this.emit('error', error)) } get connected(): boolean { @@ -96,68 +91,6 @@ export class BaseConnection extends EventEmitter { }) } - private async _processIncomingData(data: Buffer) { - const string = data.toString('utf-8') - const newLines = string.split('\r\n') - - this._unprocessedLines.push(...newLines) - - lineProcessing: while (this._unprocessedLines.length > 0) { - const firstLine = this._unprocessedLines[0] - const result = RESPONSE_REGEX.exec(firstLine) - let processedLines = 0 - - if (result && result.groups?.['response']) { - // create a response object - // Add 2 to account for the space between `command` and `response` as well as the newline after `response`. - const responseHeaderLength = result.groups?.['command'].length + result.groups?.['response'].length + 2 - if (Number.isNaN(responseHeaderLength)) { - break lineProcessing - } - const responseLen = parseInt(result?.groups?.['response']) - responseHeaderLength - const response: Response = { - command: result.groups?.['command'], - response: (Number.isNaN(responseLen) ? result.groups?.['response'] : 'OK') as Response['response'], - message: result.groups?.['responseMsg'], - body: undefined as undefined | string, - } - processedLines++ - - // parse payload data if there is any - if (!Number.isNaN(responseLen)) { - let len = responseLen - const lines: string[] = [] - - while (len > 0) { - const l = this._unprocessedLines[lines.length + 1] // offset of 1 because first line is not data - if (l === undefined) { - // we have not received all the data from server, break line processing and wait for more data - break lineProcessing - } - - len -= l.length + 2 - lines.push(l) - } - response.body = lines.join('') - processedLines += lines.length - } - - // now do something with response - this.emit('data', response) - } else if (firstLine.length > 0) { - // there is some data, but we can't recognize it, emit an error - this.emit('error', new Error(`Unknown response from vMix: "${firstLine}"`)) - processedLines++ - } else { - // empty lines we silently ignore - processedLines++ - } - - // remove processed lines - this._unprocessedLines.splice(0, processedLines) - } - } - private _triggerReconnect() { if (!this._reconnectTimeout) { this._reconnectTimeout = setTimeout(() => { @@ -181,10 +114,11 @@ export class BaseConnection extends EventEmitter { this._socket.setEncoding('utf-8') this._socket.on('data', (data) => { - this._processIncomingData(data).catch((e) => this.emit('error', e)) + this._responseStreamReader.processIncomingData(data) }) this._socket.on('connect', () => { this._setConnected(true) + this._responseStreamReader.reset() }) this._socket.on('close', () => { this._setConnected(false) diff --git a/packages/timeline-state-resolver/src/integrations/vmix/index.ts b/packages/timeline-state-resolver/src/integrations/vmix/index.ts index fa49f4f75..c8a0d35ae 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/index.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/index.ts @@ -2,7 +2,7 @@ import * as _ from 'underscore' import { DeviceWithState, CommandWithContext, DeviceStatus, StatusCode } from './../../devices/device' import { DoOnTime, SendMode } from '../../devices/doOnTime' -import { Response, VMixConnection } from './connection' +import { VMixConnection } from './connection' import { DeviceType, DeviceOptionsVMix, @@ -23,6 +23,7 @@ import { MappingsVmix, VMixTimelineStateConverter } from './vMixTimelineStateCon import { VMixXmlStateParser } from './vMixXmlStateParser' import { VMixPollingTimer } from './vMixPollingTimer' import { VMixStateSynchronizer } from './vMixStateSynchronizer' +import { Response } from './vMixResponseStreamReader' /** * Default time, in milliseconds, for when we should poll vMix to query its actual state. diff --git a/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts new file mode 100644 index 000000000..7657f82b0 --- /dev/null +++ b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts @@ -0,0 +1,100 @@ +import { EventEmitter } from 'eventemitter3' + +type ResponseStreamReaderEvents = { + response: [response: Response] + error: [error: Error] +} + +export interface Response { + command: string + response: 'OK' | 'ER' + message: string + body?: string +} + +const RESPONSE_REGEX = /^(?\w+)\s+(?OK|ER|\d+)(\s+(?.*))?/i + +/** + * A receiver for vMix responses + */ +export class VMixResponseStreamReader extends EventEmitter { + private _unprocessedLines: string[] = [] + private _lineRemainder = '' + + reset() { + this._unprocessedLines = [] + this._lineRemainder = '' + } + + processIncomingData(data: Buffer) { + const string = this._lineRemainder + data.toString('utf-8') + this._lineRemainder = '' + const lines = string.split('\r\n') + const lastChunk = lines.pop() + + if (lastChunk != null && lastChunk !== '') { + // Incomplete line found at the end - keep it + this._lineRemainder = lastChunk + } + this._unprocessedLines.push(...lines) + + let lineToProcess: string | undefined + + while ((lineToProcess = this._unprocessedLines.shift()) != null) { + const result = RESPONSE_REGEX.exec(lineToProcess) + + if (result && result.groups?.['response']) { + try { + const responseLen = parseInt(result?.groups?.['response']) + + // create a response object + const response: Response = { + command: result.groups?.['command'], + response: (Number.isNaN(responseLen) ? result.groups?.['response'] : 'OK') as Response['response'], + message: result.groups?.['responseMsg'], + body: undefined as undefined | string, + } + + // parse payload data if there is any + if (!Number.isNaN(responseLen)) { + const payloadData = this.processPayloadData(responseLen) + if (payloadData == null) { + // put the command back as we haven't received enough data + this._unprocessedLines.unshift(lineToProcess) + break + } else { + response.body = payloadData + } + } + + // now do something with response + this.emit('response', response) + } catch (e) { + this.emit('error', e instanceof Error ? e : new Error(`Couldn't process the response: "${lineToProcess}"`)) + } + } else if (lineToProcess.length > 0) { + // there is some data, but we can't recognize it, emit an error + this.emit('error', new Error(`Unknown response from vMix: "${lineToProcess}"`)) + } else { + // empty lines we silently ignore + } + } + } + + private processPayloadData(responseLen: number): string | null { + const processedLines: string[] = [] + + while (responseLen > 0) { + const line = this._unprocessedLines[processedLines.length] + if (line == null) { + // we have not received all the data from server, break line processing and wait for more data + return null + } + + processedLines.push(line) + responseLen -= line.length + 2 + } + this._unprocessedLines.splice(0, processedLines.length) + return processedLines.join('\r\n') + } +}