From 332ce673c30d78783f6b91e0c3b342bb9d6f6c98 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 29 Sep 2023 11:19:25 +0100 Subject: [PATCH] fix: improve response parsing to handle tcp fragmentation. if parsing a response fails, produce an error response, instead of a connection error --- src/__tests__/connection.spec.ts | 132 ++++++++++++++++++++++++++++--- src/api.ts | 2 +- src/connection.ts | 61 ++++++++++---- 3 files changed, 165 insertions(+), 30 deletions(-) diff --git a/src/__tests__/connection.spec.ts b/src/__tests__/connection.spec.ts index 5dd0ee08..47307e68 100644 --- a/src/__tests__/connection.spec.ts +++ b/src/__tests__/connection.spec.ts @@ -5,6 +5,7 @@ import { deserializers } from '../deserializers' import { Socket as OrgSocket } from 'net' import { Socket as MockSocket } from '../__mocks__/net' import { Commands } from '../commands' +import { BasicCasparCGAPI } from '../api' jest.mock('net') @@ -56,6 +57,14 @@ describe('connection', () => { socket.onClose = onSocketClose }) } + + function extractReqId(index: number) { + const str = onSocketWrite.mock.calls[index - 1][0] + const match = str.match(/REQ (\w+) /) + if (!match) throw new Error(`Failed to find REQ id in "${str}"`) + return match[1] + } + beforeEach(() => { setupSocketMock() }) @@ -110,7 +119,7 @@ describe('connection', () => { ) // Wait for deserializer to run - await new Promise(process.nextTick.bind(process)) + await new Promise(setImmediate) expect(onConnError).toHaveBeenCalledTimes(0) expect(onConnData).toHaveBeenCalledTimes(1) @@ -167,7 +176,7 @@ describe('connection', () => { sockets[0].mockData(Buffer.from(`\r\n\r\n`)) // Wait for deserializer to run - await new Promise(process.nextTick.bind(process)) + await new Promise(setImmediate) expect(onConnError).toHaveBeenCalledTimes(0) expect(onConnData).toHaveBeenCalledTimes(1) @@ -243,9 +252,7 @@ describe('connection', () => { sockets[0].mockData(Buffer.from(`RES cmd2 202 PLAY OK\r\n`)) // Wait for deserializer to run - await new Promise(process.nextTick.bind(process)) - await new Promise(process.nextTick.bind(process)) - await new Promise(process.nextTick.bind(process)) + await new Promise(setImmediate) expect(onConnError).toHaveBeenCalledTimes(0) expect(onConnData).toHaveBeenCalledTimes(2) @@ -321,19 +328,26 @@ describe('connection', () => { expect(onSocketWrite).toHaveBeenNthCalledWith(2, 'REQ cmd2 PLAY 1-10\r\n', 'utf-8') // Reply with a blob designed to crash the xml parser - sockets[0].mockData(Buffer.from(`201 INFO OK\r\n { conn.disconnect() } }) + + it('test with full connection', async () => { + const client = new BasicCasparCGAPI({ + host: '127.0.0.1', + port: 5250, + autoConnect: true, + }) + try { + expect(client).toBeTruthy() + + const onConnError = jest.fn() + // const onConnData = jest.fn() + client.on('error', onConnError) + // client.on('data', onConnData) + + const onCommandOk = jest.fn() + const onCommandError = jest.fn() + + const sockets = SocketMock.openSockets() + expect(sockets).toHaveLength(1) + + // Dispatch a command + const sendError = await client.executeCommand({ + command: Commands.Info, + params: {}, + }) + sendError.request?.then(onCommandOk, onCommandError) + const sendError2 = await client.executeCommand({ + command: Commands.Play, + params: { + channel: 1, + layer: 10, + }, + }) + sendError2.request?.then(onCommandOk, onCommandError) + expect(onConnError).toHaveBeenCalledTimes(0) + expect(onCommandOk).toHaveBeenCalledTimes(0) + expect(onCommandError).toHaveBeenCalledTimes(0) + + // Info was sent + expect(onSocketWrite).toHaveBeenCalledTimes(2) + expect(onSocketWrite).toHaveBeenNthCalledWith(1, expect.stringMatching(/REQ (\w+) INFO\r\n/), 'utf-8') + expect(onSocketWrite).toHaveBeenNthCalledWith( + 2, + expect.stringMatching(/REQ (\w+) PLAY 1-10\r\n/), + 'utf-8' + ) + + // Reply with a blob designed to crash the xml parser + const infoReqId = extractReqId(1) + sockets[0].mockData(Buffer.from(`RES ${infoReqId} 201 INFO OK\r\n { private _socket?: Socket + private _unprocessedData = '' private _unprocessedLines: string[] = [] private _reconnectTimeout?: NodeJS.Timeout private _connected = false @@ -125,20 +126,25 @@ export class Connection extends EventEmitter { }) } - private async _processIncomingData(data: Buffer) { - const string = data.toString('utf-8') - const newLines = string.split('\r\n') - + private _processIncomingData(data: Buffer) { + /** + * This is a simple strategy to handle receiving newline separated data, factoring in arbitrary TCP fragmentation. + * It is common for a long response to be split across multiple packets, most likely with the split happening in the middle of a line. + */ + this._unprocessedData += data.toString('utf-8') + const newLines = this._unprocessedData.split('\r\n') + // Pop and preserve the last fragment as unprocessed. In most cases this will be an empty string, but it could be the first portion of a line + this._unprocessedData = newLines.pop() ?? '' this._unprocessedLines.push(...newLines) while (this._unprocessedLines.length > 0) { const result = RESPONSE_REGEX.exec(this._unprocessedLines[0]) let processedLines = 0 - if (result && result.groups?.['ResponseCode']) { + if (result?.groups?.['ResponseCode']) { // create a response object const responseCode = parseInt(result?.groups?.['ResponseCode']) - const response = { + const response: Response = { reqId: result?.groups?.['ReqId'], command: result?.groups?.['Action'] as Commands, responseCode, @@ -149,22 +155,41 @@ export class Connection extends EventEmitter { // parse additional lines if needed if (response.responseCode === 200) { + const indexOfBlankLine = this._unprocessedLines.indexOf('') + if (indexOfBlankLine === -1) break // No termination yet, try again later + // multiple lines of data - response.data = this._unprocessedLines.slice(1, this._unprocessedLines.indexOf('')) + response.data = this._unprocessedLines.slice(1, indexOfBlankLine) processedLines += response.data.length + 1 // data lines + 1 empty line } else if (response.responseCode === 201 || response.responseCode === 400) { + if (this._unprocessedLines.length < 2) break // No data line, try again later + response.data = [this._unprocessedLines[1]] processedLines++ } - const deserializers = this._getVersionedDeserializers() - // attempt to deserialize the response if we can - if (deserializers[response.command] && response.data.length) { - response.data = await deserializers[response.command](response.data) - } - - // now do something with response - this.emit('data', response) + // Parse the command after `this._unprocessedLines` has been updated + setImmediate(() => { + Promise.resolve() + .then(async () => { + const deserializers = this._getVersionedDeserializers() + // attempt to deserialize the response if we can + if (deserializers[response.command] && response.data.length) { + response.data = await deserializers[response.command](response.data) + } + + // now do something with response + this.emit('data', response) + }) + .catch(() => { + this.emit('data', { + ...response, + responseCode: 500, // TODO better value? + type: ResponseTypes.ServerError, + message: 'Invalid response received.', + }) + }) + }) } else { // well this is not happy, do we do something? // perhaps this is the infamous 100 or 101 response code, although that doesn't appear in casparcg source code @@ -198,7 +223,11 @@ export class Connection extends EventEmitter { this._socket.setEncoding('utf-8') this._socket.on('data', (data) => { - this._processIncomingData(data).catch((e) => this.emit('error', e)) + try { + this._processIncomingData(data) + } catch (e: any) { + this.emit('error', e) + } }) this._socket.on('connect', () => { this._setConnected(true)