From b5c8c1c109d058f738713106abbd69d7d610076f Mon Sep 17 00:00:00 2001 From: ianshade Date: Thu, 26 Oct 2023 11:15:41 +0200 Subject: [PATCH] fix: make bridge and device initialization more robust --- apps/app/src/electron/bridgeHandler.ts | 48 +++++++++++++++----------- shared/packages/tsr-bridge/src/TSR.ts | 47 ++++++++++++++++++------- 2 files changed, 63 insertions(+), 32 deletions(-) diff --git a/apps/app/src/electron/bridgeHandler.ts b/apps/app/src/electron/bridgeHandler.ts index 60dc8f34..e98b5aa9 100644 --- a/apps/app/src/electron/bridgeHandler.ts +++ b/apps/app/src/electron/bridgeHandler.ts @@ -33,8 +33,6 @@ import { AnalogInput } from '../models/project/AnalogInput' export const { version: CURRENT_VERSION }: { version: string } = require('../../package.json') export const SERVER_PORT = 5400 -type AnyBridgeConnection = WebsocketBridgeConnection | LocalBridgeConnection - /** This handles connected bridges */ export class BridgeHandler { server: WebsocketServer @@ -48,7 +46,7 @@ export class BridgeHandler { } > = new Map() private internalBridge: LocalBridgeConnection | null = null - private connectedBridges: Array = [] + private connectedBridges: Map = new Map() private mappings: Mappings = {} private timelines: { [timelineId: string]: TSRTimeline } = {} private settings: Map = new Map() @@ -73,6 +71,18 @@ export class BridgeHandler { ...callbacks, getMappings: () => this.mappings, getTimelines: () => this.timelines, + onInitialized: (connection) => { + if (connection.bridgeId == null) return + this.connectedBridges.set(connection.bridgeId, connection) + }, + onDisconnected: (connection) => { + if (connection.bridgeId == null) return + const currentConnection = this.connectedBridges.get(connection.bridgeId) + if (currentConnection === connection) { + // delete only if that's the same connection, otherwise a new one might have been initiated before this one fully closed + this.connectedBridges.delete(connection.bridgeId) + } + }, } this.server = new WebsocketServer(this.log, SERVER_PORT, (connection: WebsocketConnection) => { // On connection: @@ -89,10 +99,9 @@ export class BridgeHandler { for (const [bridgeId, outgoing] of this.outgoingBridges.entries()) { if (outgoing.connection?.connectionId === connection.connectionId) { bridge.bridgeId = bridgeId + this.connectedBridges.set(bridgeId, bridge) } } - - this.connectedBridges.push(bridge) }) this.server.on('close', () => { @@ -108,8 +117,8 @@ export class BridgeHandler { this.reconnectToBridges() }, 1000) } - getBridgeConnection(bridgeId: BridgeId): AnyBridgeConnection | undefined { - return this.connectedBridges.find((b) => b.bridgeId === bridgeId) + getBridgeConnection(bridgeId: BridgeId): AbstractBridgeConnection | undefined { + return this.connectedBridges.get(bridgeId) } async onClose(): Promise { if (this.internalBridge) { @@ -129,18 +138,13 @@ export class BridgeHandler { this.storage, this.connectionCallbacks ) - this.connectedBridges.push(this.internalBridge) + this.connectedBridges.set(INTERNAL_BRIDGE_ID, this.internalBridge) } } else { if (this.internalBridge) { this.session.updateBridgeStatus(INTERNAL_BRIDGE_ID, null) - const bridgeIndex = this.connectedBridges.findIndex( - (connectedBridge) => connectedBridge === this.internalBridge - ) - if (bridgeIndex >= 0) { - this.connectedBridges.splice(bridgeIndex, 1) - } const internalBridge = this.internalBridge + this.connectedBridges.delete(INTERNAL_BRIDGE_ID) this.internalBridge = null internalBridge.destroy().catch(this.log.error) } @@ -218,7 +222,7 @@ export class BridgeHandler { if (!_.isEqual(this.mappings, mappings)) { this.mappings = mappings - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.setMappings(mappings) } } @@ -228,20 +232,20 @@ export class BridgeHandler { if (timeline) { this.timelines[timelineId] = timeline - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.addTimeline(timelineId, timeline) } } else { delete this.timelines[timelineId] - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.removeTimeline(timelineId) } } } } updateSettings(bridgeId: BridgeId, settings: Bridge['settings']): void { - const bridgeConnection = this.connectedBridges.find((bc) => bc.bridgeId === bridgeId) + const bridgeConnection = this.connectedBridges.get(bridgeId) if (bridgeConnection) { bridgeConnection.setSettings(settings) } @@ -303,12 +307,12 @@ export class BridgeHandler { modified: number }[] ): void { - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.updateDatastore(updates) } } refreshResources(): void { - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.refreshResources() } } @@ -323,6 +327,8 @@ interface BridgeHandlerCallbacks { interface BridgeConnectionCallbacks extends BridgeHandlerCallbacks { getMappings: () => Mappings getTimelines: () => { [timelineId: string]: TSRTimeline } + onInitialized: (connection: AbstractBridgeConnection) => void + onDisconnected: (connection: AbstractBridgeConnection) => void } abstract class AbstractBridgeConnection { @@ -412,6 +418,7 @@ abstract class AbstractBridgeConnection { } else if (this.bridgeId !== id) { throw new Error(`bridgeId ID mismatch: "${this.bridgeId}" vs "${id}"`) } + this.callbacks.onInitialized(this) if (version !== CURRENT_VERSION) { this.callbacks.onVersionMismatch(id, version, CURRENT_VERSION) @@ -625,6 +632,7 @@ export class WebsocketBridgeConnection extends AbstractBridgeConnection { this.session.updateBridgeStatus(this.bridgeId, status) } + this.callbacks.onDisconnected(this) } }) this.connection.on('message', this.handleMessage.bind(this)) diff --git a/shared/packages/tsr-bridge/src/TSR.ts b/shared/packages/tsr-bridge/src/TSR.ts index b78d8de4..44bf10d6 100644 --- a/shared/packages/tsr-bridge/src/TSR.ts +++ b/shared/packages/tsr-bridge/src/TSR.ts @@ -1,5 +1,12 @@ import _ from 'lodash' -import { Conductor, ConductorOptions, DeviceOptionsAny, DeviceType, OSCDeviceType } from 'timeline-state-resolver' +import { + AbortError, + Conductor, + ConductorOptions, + DeviceOptionsAny, + DeviceType, + OSCDeviceType, +} from 'timeline-state-resolver' import { MetadataAny, ResourceAny, TSRDeviceId, unprotectString } from '@shared/models' import { BridgeAPI, LoggerLike } from '@shared/api' import { CasparCGSideload } from './sideload/CasparCG' @@ -18,7 +25,7 @@ export class TSR { public newConnection = false public conductor: Conductor public send: (message: BridgeAPI.FromBridge.Any) => void - private devices = new Map() + private devices = new Map() private sideLoadedDevices = new Map() @@ -113,10 +120,14 @@ export class TSR { if (!existingDevice || !_.isEqual(existingDevice, newDevice)) { if (existingDevice) { + existingDevice.abortController.abort() await this.conductor.removeDevice(unprotectString(deviceId)) } + await this._removeSideloadDevice(deviceId) - this.devices.set(deviceId, newDevice) + const abortController = new AbortController() + + this.devices.set(deviceId, { ...newDevice, abortController }) this.onDeviceStatus(deviceId, { statusCode: StatusCode.UNKNOWN, messages: ['Initializing'], @@ -128,7 +139,9 @@ export class TSR { this.sideLoadDevice(deviceId, newDevice) // Create the device, but don't initialize it: - const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDevice) + const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDevice, { + signal: abortController.signal, + }) this.onDeviceStatus(deviceId, { active: true, @@ -171,16 +184,21 @@ export class TSR { }) // now initialize it - await this.conductor.initDevice(unprotectString(deviceId), newDevice) + await this.conductor.initDevice(unprotectString(deviceId), newDevice, undefined, { + signal: abortController.signal, + }) this.onDeviceStatus(deviceId, await device.device.getStatus()) - })().catch((error) => this.log.error('TSR device error: ' + stringifyError(error))) + })().catch((error) => { + if (!(error instanceof AbortError)) this.log.error('TSR device error: ' + stringifyError(error)) + }) } } // Removed: - for (const deviceId of this.devices.keys()) { + for (const [deviceId, oldDevice] of this.devices.entries()) { const newDevice = deviceOptions.get(deviceId) if (!newDevice || newDevice.disable) { + oldDevice.abortController.abort() await this._removeDevice(deviceId) this.reportRemovedDevice(deviceId) @@ -192,11 +210,7 @@ export class TSR { } private async _removeDevice(deviceId: TSRDeviceId): Promise { // Delete the sideloaded device, if any - const sideLoadedDevice = this.sideLoadedDevices.get(deviceId) - if (sideLoadedDevice) { - await sideLoadedDevice.close() - this.sideLoadedDevices.delete(deviceId) - } + await this._removeSideloadDevice(deviceId) // HACK: There are some scenarios in which this method will never return. // For example, when trying to remove a CasparCG device that has never connected. @@ -207,6 +221,15 @@ export class TSR { this.devices.delete(deviceId) this.deviceStatus.delete(deviceId) } + + private async _removeSideloadDevice(deviceId: TSRDeviceId) { + const sideLoadedDevice = this.sideLoadedDevices.get(deviceId) + if (sideLoadedDevice) { + await sideLoadedDevice.close() + this.sideLoadedDevices.delete(deviceId) + } + } + public refreshResourcesAndMetadata( cb: (deviceId: TSRDeviceId, resources: ResourceAny[], metadata: MetadataAny) => void ): void {