diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts
new file mode 100644
index 000000000..9e919c271
--- /dev/null
+++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts
@@ -0,0 +1,488 @@
+import { VMixResponseStreamReader } from '../vMixResponseStreamReader'
+
+describe('VMixResponseStreamReader', () => {
+ it('processes a complete message', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(1)
+ expect(onMessage).toHaveBeenCalledWith(
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ })
+
+ it('processes two complete messages', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
+ reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'OK',
+ message: 'Take',
+ })
+ )
+ })
+
+ it('processes a fragmented message #1', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION O'))
+ reader.processIncomingData(Buffer.from('K 27.0.0.49\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(1)
+ expect(onMessage).toHaveBeenCalledWith(
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ })
+
+ it('processes a fragmented message #2', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
+ reader.processIncomingData(Buffer.from('FUNCTION'))
+ reader.processIncomingData(Buffer.from(' ER Error message\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'ER',
+ message: 'Error message',
+ })
+ )
+ })
+
+ it('processes a fragmented message #3', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49'))
+ reader.processIncomingData(Buffer.from('\r\n'))
+ reader.processIncomingData(Buffer.from('FUNCTION'))
+ reader.processIncomingData(Buffer.from(' ER Error message\r'))
+ reader.processIncomingData(Buffer.from('\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'ER',
+ message: 'Error message',
+ })
+ )
+ })
+
+ it('processes combined messages #1', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'ER',
+ message: 'Error message',
+ })
+ )
+ })
+
+ it('processes combined messages #2', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E'))
+ reader.processIncomingData(Buffer.from('R Error message\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'ER',
+ message: 'Error message',
+ })
+ )
+ })
+
+ it('processes combined messages #3', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E'))
+ reader.processIncomingData(Buffer.from('R Error message\r\nFUNCTION OK T'))
+ reader.processIncomingData(Buffer.from('ake\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(3)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'ER',
+ message: 'Error message',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 3,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'OK',
+ message: 'Take',
+ })
+ )
+ })
+
+ it('processes a message with data', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ const xmlString =
+ '27.0.0.49HDC:\\preset.vmix'
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
+
+ expect(onMessage).toHaveBeenCalledTimes(1)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString,
+ })
+ )
+ })
+
+ it('processes a multiline message with data', async () => {
+ // note: I don't know if those can actually be encountered
+
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ const xmlString =
+ '\r\n27.0.0.49\r\nHD\r\nC:\\preset.vmix\r\n\r\n'
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
+
+ expect(onMessage).toHaveBeenCalledTimes(1)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString,
+ })
+ )
+ })
+
+ it('processes a fragmented message with data', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ const xmlString =
+ '27.0.0.49HDC:\\preset.vmix'
+ const xmlMessage = makeXmlMessage(xmlString)
+ splitAtIndices(xmlMessage, [2, 10, 25, 40]).forEach((fragment) => {
+ expect(fragment.length).toBeGreaterThan(0)
+ reader.processIncomingData(Buffer.from(fragment))
+ })
+
+ expect(onMessage).toHaveBeenCalledTimes(1)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString,
+ })
+ )
+ })
+
+ it('processes combined messages with data', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ const xmlString =
+ '27.0.0.49HDC:\\preset.vmix'
+ const xmlString2 = '25.0.0.14K'
+
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2)))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString,
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString2,
+ })
+ )
+ })
+
+ it('processes separate messages with data', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ const xmlString =
+ '27.0.0.49HDC:\\preset.vmix'
+ const xmlString2 = '25.0.0.14K'
+
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2)))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString,
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString2,
+ })
+ )
+ })
+
+ it('can be reset during incomplete message', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+
+ const xmlString =
+ '27.0.0.49HDC:\\preset.vmix'
+ const xmlString2 = '25.0.0.14K'
+
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString).substring(0, 44)))
+ reader.reset()
+ reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2)))
+
+ expect(onMessage).toHaveBeenCalledTimes(1)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'XML',
+ response: 'OK',
+ body: xmlString2,
+ })
+ )
+ })
+
+ it('catches errors thrown in response handler', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn(() => {
+ throw Error('Something went wrong')
+ })
+ reader.on('response', onMessage)
+ const onError = jest.fn()
+ reader.on('error', onError)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
+ reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'OK',
+ message: 'Take',
+ })
+ )
+
+ expect(onError).toHaveBeenCalledTimes(2)
+ })
+
+ it('rejects empty messages silently', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+ const onError = jest.fn()
+ reader.on('error', onError)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
+ reader.processIncomingData(Buffer.from('\r\n'))
+ reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'OK',
+ message: 'Take',
+ })
+ )
+
+ expect(onError).toHaveBeenCalledTimes(0)
+ })
+
+ it('rejects invalid messages and emits error', async () => {
+ const reader = new VMixResponseStreamReader()
+
+ const onMessage = jest.fn()
+ reader.on('response', onMessage)
+ const onError = jest.fn()
+ reader.on('error', onError)
+
+ reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
+ reader.processIncomingData(Buffer.from('WASSUP\r\n'))
+ reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
+
+ expect(onMessage).toHaveBeenCalledTimes(2)
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 1,
+ expect.objectContaining({
+ command: 'VERSION',
+ response: 'OK',
+ })
+ )
+ expect(onMessage).toHaveBeenNthCalledWith(
+ 2,
+ expect.objectContaining({
+ command: 'FUNCTION',
+ response: 'OK',
+ message: 'Take',
+ })
+ )
+
+ expect(onError).toHaveBeenCalledTimes(1)
+ })
+})
+
+function makeXmlMessage(xmlString: string): string {
+ return `XML ${xmlString.length + 2}\r\n${xmlString}\r\n`
+}
+
+function splitAtIndices(text: string, indices: number[]) {
+ const result: string[] = []
+ let lastIndex = 0
+
+ indices.forEach((index) => {
+ if (index > lastIndex && index < text.length) {
+ result.push(text.substring(lastIndex, index))
+ lastIndex = index
+ }
+ })
+
+ if (lastIndex < text.length) {
+ result.push(text.substring(lastIndex))
+ }
+
+ return result
+}
diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts
index 2240abb41..f298246f6 100644
--- a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts
+++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts
@@ -46,7 +46,7 @@ ${inputs?.join('\r\n') ?? defaultInputs}\r\n
-`
+\r\n`
}
export function setupVmixMock() {
diff --git a/packages/timeline-state-resolver/src/integrations/vmix/connection.ts b/packages/timeline-state-resolver/src/integrations/vmix/connection.ts
index 07b7d570a..4efae0d50 100644
--- a/packages/timeline-state-resolver/src/integrations/vmix/connection.ts
+++ b/packages/timeline-state-resolver/src/integrations/vmix/connection.ts
@@ -2,9 +2,9 @@ import { EventEmitter } from 'eventemitter3'
import { Socket } from 'net'
import { VMixCommand } from 'timeline-state-resolver-types'
import { VMixStateCommand } from './vMixCommands'
+import { Response, VMixResponseStreamReader } from './vMixResponseStreamReader'
const VMIX_DEFAULT_TCP_PORT = 8099
-const RESPONSE_REGEX = /^(?\w+)\s+(?OK|ER|\d+)(\s+(?.*))?/i
export enum ResponseTypes {
Info = 'INFO',
@@ -21,13 +21,6 @@ export type ConnectionEvents = {
error: [error: Error]
}
-export interface Response {
- command: string
- response: 'OK' | 'ER'
- message: string
- body?: string
-}
-
/**
* This TSR integration polls the state of vMix and merges that into our last-known state.
* However, not all state properties can be retried from vMix's API.
@@ -38,13 +31,15 @@ export type InferredPartialInputStateKeys = 'filePath' | 'fade' | 'audioAuto' |
export class BaseConnection extends EventEmitter {
private _socket?: Socket
- private _unprocessedLines: string[] = []
private _reconnectTimeout?: NodeJS.Timeout
private _connected = false
+ private _responseStreamReader = new VMixResponseStreamReader()
constructor(private host: string, private port = VMIX_DEFAULT_TCP_PORT, autoConnect = false) {
super()
if (autoConnect) this._setupSocket()
+ this._responseStreamReader.on('response', (response) => this.emit('data', response))
+ this._responseStreamReader.on('error', (error) => this.emit('error', error))
}
get connected(): boolean {
@@ -96,68 +91,6 @@ export class BaseConnection extends EventEmitter {
})
}
- private async _processIncomingData(data: Buffer) {
- const string = data.toString('utf-8')
- const newLines = string.split('\r\n')
-
- this._unprocessedLines.push(...newLines)
-
- lineProcessing: while (this._unprocessedLines.length > 0) {
- const firstLine = this._unprocessedLines[0]
- const result = RESPONSE_REGEX.exec(firstLine)
- let processedLines = 0
-
- if (result && result.groups?.['response']) {
- // create a response object
- // Add 2 to account for the space between `command` and `response` as well as the newline after `response`.
- const responseHeaderLength = result.groups?.['command'].length + result.groups?.['response'].length + 2
- if (Number.isNaN(responseHeaderLength)) {
- break lineProcessing
- }
- const responseLen = parseInt(result?.groups?.['response']) - responseHeaderLength
- const response: Response = {
- command: result.groups?.['command'],
- response: (Number.isNaN(responseLen) ? result.groups?.['response'] : 'OK') as Response['response'],
- message: result.groups?.['responseMsg'],
- body: undefined as undefined | string,
- }
- processedLines++
-
- // parse payload data if there is any
- if (!Number.isNaN(responseLen)) {
- let len = responseLen
- const lines: string[] = []
-
- while (len > 0) {
- const l = this._unprocessedLines[lines.length + 1] // offset of 1 because first line is not data
- if (l === undefined) {
- // we have not received all the data from server, break line processing and wait for more data
- break lineProcessing
- }
-
- len -= l.length + 2
- lines.push(l)
- }
- response.body = lines.join('')
- processedLines += lines.length
- }
-
- // now do something with response
- this.emit('data', response)
- } else if (firstLine.length > 0) {
- // there is some data, but we can't recognize it, emit an error
- this.emit('error', new Error(`Unknown response from vMix: "${firstLine}"`))
- processedLines++
- } else {
- // empty lines we silently ignore
- processedLines++
- }
-
- // remove processed lines
- this._unprocessedLines.splice(0, processedLines)
- }
- }
-
private _triggerReconnect() {
if (!this._reconnectTimeout) {
this._reconnectTimeout = setTimeout(() => {
@@ -181,10 +114,11 @@ export class BaseConnection extends EventEmitter {
this._socket.setEncoding('utf-8')
this._socket.on('data', (data) => {
- this._processIncomingData(data).catch((e) => this.emit('error', e))
+ this._responseStreamReader.processIncomingData(data)
})
this._socket.on('connect', () => {
this._setConnected(true)
+ this._responseStreamReader.reset()
})
this._socket.on('close', () => {
this._setConnected(false)
diff --git a/packages/timeline-state-resolver/src/integrations/vmix/index.ts b/packages/timeline-state-resolver/src/integrations/vmix/index.ts
index fa49f4f75..c8a0d35ae 100644
--- a/packages/timeline-state-resolver/src/integrations/vmix/index.ts
+++ b/packages/timeline-state-resolver/src/integrations/vmix/index.ts
@@ -2,7 +2,7 @@ import * as _ from 'underscore'
import { DeviceWithState, CommandWithContext, DeviceStatus, StatusCode } from './../../devices/device'
import { DoOnTime, SendMode } from '../../devices/doOnTime'
-import { Response, VMixConnection } from './connection'
+import { VMixConnection } from './connection'
import {
DeviceType,
DeviceOptionsVMix,
@@ -23,6 +23,7 @@ import { MappingsVmix, VMixTimelineStateConverter } from './vMixTimelineStateCon
import { VMixXmlStateParser } from './vMixXmlStateParser'
import { VMixPollingTimer } from './vMixPollingTimer'
import { VMixStateSynchronizer } from './vMixStateSynchronizer'
+import { Response } from './vMixResponseStreamReader'
/**
* Default time, in milliseconds, for when we should poll vMix to query its actual state.
diff --git a/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts
new file mode 100644
index 000000000..7657f82b0
--- /dev/null
+++ b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts
@@ -0,0 +1,100 @@
+import { EventEmitter } from 'eventemitter3'
+
+type ResponseStreamReaderEvents = {
+ response: [response: Response]
+ error: [error: Error]
+}
+
+export interface Response {
+ command: string
+ response: 'OK' | 'ER'
+ message: string
+ body?: string
+}
+
+const RESPONSE_REGEX = /^(?\w+)\s+(?OK|ER|\d+)(\s+(?.*))?/i
+
+/**
+ * A receiver for vMix responses
+ */
+export class VMixResponseStreamReader extends EventEmitter {
+ private _unprocessedLines: string[] = []
+ private _lineRemainder = ''
+
+ reset() {
+ this._unprocessedLines = []
+ this._lineRemainder = ''
+ }
+
+ processIncomingData(data: Buffer) {
+ const string = this._lineRemainder + data.toString('utf-8')
+ this._lineRemainder = ''
+ const lines = string.split('\r\n')
+ const lastChunk = lines.pop()
+
+ if (lastChunk != null && lastChunk !== '') {
+ // Incomplete line found at the end - keep it
+ this._lineRemainder = lastChunk
+ }
+ this._unprocessedLines.push(...lines)
+
+ let lineToProcess: string | undefined
+
+ while ((lineToProcess = this._unprocessedLines.shift()) != null) {
+ const result = RESPONSE_REGEX.exec(lineToProcess)
+
+ if (result && result.groups?.['response']) {
+ try {
+ const responseLen = parseInt(result?.groups?.['response'])
+
+ // create a response object
+ const response: Response = {
+ command: result.groups?.['command'],
+ response: (Number.isNaN(responseLen) ? result.groups?.['response'] : 'OK') as Response['response'],
+ message: result.groups?.['responseMsg'],
+ body: undefined as undefined | string,
+ }
+
+ // parse payload data if there is any
+ if (!Number.isNaN(responseLen)) {
+ const payloadData = this.processPayloadData(responseLen)
+ if (payloadData == null) {
+ // put the command back as we haven't received enough data
+ this._unprocessedLines.unshift(lineToProcess)
+ break
+ } else {
+ response.body = payloadData
+ }
+ }
+
+ // now do something with response
+ this.emit('response', response)
+ } catch (e) {
+ this.emit('error', e instanceof Error ? e : new Error(`Couldn't process the response: "${lineToProcess}"`))
+ }
+ } else if (lineToProcess.length > 0) {
+ // there is some data, but we can't recognize it, emit an error
+ this.emit('error', new Error(`Unknown response from vMix: "${lineToProcess}"`))
+ } else {
+ // empty lines we silently ignore
+ }
+ }
+ }
+
+ private processPayloadData(responseLen: number): string | null {
+ const processedLines: string[] = []
+
+ while (responseLen > 0) {
+ const line = this._unprocessedLines[processedLines.length]
+ if (line == null) {
+ // we have not received all the data from server, break line processing and wait for more data
+ return null
+ }
+
+ processedLines.push(line)
+ responseLen -= line.length + 2
+ }
+ this._unprocessedLines.splice(0, processedLines.length)
+ return processedLines.join('\r\n')
+ }
+}