diff --git a/apps/app/src/electron/bridgeHandler.ts b/apps/app/src/electron/bridgeHandler.ts index 60dc8f34..e98b5aa9 100644 --- a/apps/app/src/electron/bridgeHandler.ts +++ b/apps/app/src/electron/bridgeHandler.ts @@ -33,8 +33,6 @@ import { AnalogInput } from '../models/project/AnalogInput' export const { version: CURRENT_VERSION }: { version: string } = require('../../package.json') export const SERVER_PORT = 5400 -type AnyBridgeConnection = WebsocketBridgeConnection | LocalBridgeConnection - /** This handles connected bridges */ export class BridgeHandler { server: WebsocketServer @@ -48,7 +46,7 @@ export class BridgeHandler { } > = new Map() private internalBridge: LocalBridgeConnection | null = null - private connectedBridges: Array = [] + private connectedBridges: Map = new Map() private mappings: Mappings = {} private timelines: { [timelineId: string]: TSRTimeline } = {} private settings: Map = new Map() @@ -73,6 +71,18 @@ export class BridgeHandler { ...callbacks, getMappings: () => this.mappings, getTimelines: () => this.timelines, + onInitialized: (connection) => { + if (connection.bridgeId == null) return + this.connectedBridges.set(connection.bridgeId, connection) + }, + onDisconnected: (connection) => { + if (connection.bridgeId == null) return + const currentConnection = this.connectedBridges.get(connection.bridgeId) + if (currentConnection === connection) { + // delete only if that's the same connection, otherwise a new one might have been initiated before this one fully closed + this.connectedBridges.delete(connection.bridgeId) + } + }, } this.server = new WebsocketServer(this.log, SERVER_PORT, (connection: WebsocketConnection) => { // On connection: @@ -89,10 +99,9 @@ export class BridgeHandler { for (const [bridgeId, outgoing] of this.outgoingBridges.entries()) { if (outgoing.connection?.connectionId === connection.connectionId) { bridge.bridgeId = bridgeId + this.connectedBridges.set(bridgeId, bridge) } } - - this.connectedBridges.push(bridge) }) this.server.on('close', () => { @@ -108,8 +117,8 @@ export class BridgeHandler { this.reconnectToBridges() }, 1000) } - getBridgeConnection(bridgeId: BridgeId): AnyBridgeConnection | undefined { - return this.connectedBridges.find((b) => b.bridgeId === bridgeId) + getBridgeConnection(bridgeId: BridgeId): AbstractBridgeConnection | undefined { + return this.connectedBridges.get(bridgeId) } async onClose(): Promise { if (this.internalBridge) { @@ -129,18 +138,13 @@ export class BridgeHandler { this.storage, this.connectionCallbacks ) - this.connectedBridges.push(this.internalBridge) + this.connectedBridges.set(INTERNAL_BRIDGE_ID, this.internalBridge) } } else { if (this.internalBridge) { this.session.updateBridgeStatus(INTERNAL_BRIDGE_ID, null) - const bridgeIndex = this.connectedBridges.findIndex( - (connectedBridge) => connectedBridge === this.internalBridge - ) - if (bridgeIndex >= 0) { - this.connectedBridges.splice(bridgeIndex, 1) - } const internalBridge = this.internalBridge + this.connectedBridges.delete(INTERNAL_BRIDGE_ID) this.internalBridge = null internalBridge.destroy().catch(this.log.error) } @@ -218,7 +222,7 @@ export class BridgeHandler { if (!_.isEqual(this.mappings, mappings)) { this.mappings = mappings - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.setMappings(mappings) } } @@ -228,20 +232,20 @@ export class BridgeHandler { if (timeline) { this.timelines[timelineId] = timeline - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.addTimeline(timelineId, timeline) } } else { delete this.timelines[timelineId] - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.removeTimeline(timelineId) } } } } updateSettings(bridgeId: BridgeId, settings: Bridge['settings']): void { - const bridgeConnection = this.connectedBridges.find((bc) => bc.bridgeId === bridgeId) + const bridgeConnection = this.connectedBridges.get(bridgeId) if (bridgeConnection) { bridgeConnection.setSettings(settings) } @@ -303,12 +307,12 @@ export class BridgeHandler { modified: number }[] ): void { - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.updateDatastore(updates) } } refreshResources(): void { - for (const bridgeConnection of this.connectedBridges) { + for (const bridgeConnection of this.connectedBridges.values()) { bridgeConnection.refreshResources() } } @@ -323,6 +327,8 @@ interface BridgeHandlerCallbacks { interface BridgeConnectionCallbacks extends BridgeHandlerCallbacks { getMappings: () => Mappings getTimelines: () => { [timelineId: string]: TSRTimeline } + onInitialized: (connection: AbstractBridgeConnection) => void + onDisconnected: (connection: AbstractBridgeConnection) => void } abstract class AbstractBridgeConnection { @@ -412,6 +418,7 @@ abstract class AbstractBridgeConnection { } else if (this.bridgeId !== id) { throw new Error(`bridgeId ID mismatch: "${this.bridgeId}" vs "${id}"`) } + this.callbacks.onInitialized(this) if (version !== CURRENT_VERSION) { this.callbacks.onVersionMismatch(id, version, CURRENT_VERSION) @@ -625,6 +632,7 @@ export class WebsocketBridgeConnection extends AbstractBridgeConnection { this.session.updateBridgeStatus(this.bridgeId, status) } + this.callbacks.onDisconnected(this) } }) this.connection.on('message', this.handleMessage.bind(this)) diff --git a/shared/packages/tsr-bridge/src/TSR.ts b/shared/packages/tsr-bridge/src/TSR.ts index b78d8de4..3a1746df 100644 --- a/shared/packages/tsr-bridge/src/TSR.ts +++ b/shared/packages/tsr-bridge/src/TSR.ts @@ -1,5 +1,12 @@ import _ from 'lodash' -import { Conductor, ConductorOptions, DeviceOptionsAny, DeviceType, OSCDeviceType } from 'timeline-state-resolver' +import { + AbortError, + Conductor, + ConductorOptions, + DeviceOptionsAny, + DeviceType, + OSCDeviceType, +} from 'timeline-state-resolver' import { MetadataAny, ResourceAny, TSRDeviceId, unprotectString } from '@shared/models' import { BridgeAPI, LoggerLike } from '@shared/api' import { CasparCGSideload } from './sideload/CasparCG' @@ -18,7 +25,7 @@ export class TSR { public newConnection = false public conductor: Conductor public send: (message: BridgeAPI.FromBridge.Any) => void - private devices = new Map() + private devices = new Map() private sideLoadedDevices = new Map() @@ -106,17 +113,21 @@ export class TSR { private async _updateDevices(): Promise { // Added/updated: const deviceOptions = this.deviceOptions - for (const [deviceId, newDevice] of deviceOptions.entries()) { - if (newDevice.disable) continue + for (const [deviceId, newDeviceOptions] of deviceOptions.entries()) { + if (newDeviceOptions.disable) continue const existingDevice = this.devices.get(deviceId) - if (!existingDevice || !_.isEqual(existingDevice, newDevice)) { + if (!existingDevice || !_.isEqual(existingDevice.options, newDeviceOptions)) { if (existingDevice) { + existingDevice.abortController.abort() await this.conductor.removeDevice(unprotectString(deviceId)) } + await this._removeSideloadDevice(deviceId) - this.devices.set(deviceId, newDevice) + const abortController = new AbortController() + + this.devices.set(deviceId, { options: newDeviceOptions, abortController }) this.onDeviceStatus(deviceId, { statusCode: StatusCode.UNKNOWN, messages: ['Initializing'], @@ -125,10 +136,12 @@ export class TSR { // Run async so as not to block other devices from being processed. ;(async () => { - this.sideLoadDevice(deviceId, newDevice) + this.sideLoadDevice(deviceId, newDeviceOptions) // Create the device, but don't initialize it: - const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDevice) + const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDeviceOptions, { + signal: abortController.signal, + }) this.onDeviceStatus(deviceId, { active: true, @@ -171,16 +184,21 @@ export class TSR { }) // now initialize it - await this.conductor.initDevice(unprotectString(deviceId), newDevice) + await this.conductor.initDevice(unprotectString(deviceId), newDeviceOptions, undefined, { + signal: abortController.signal, + }) this.onDeviceStatus(deviceId, await device.device.getStatus()) - })().catch((error) => this.log.error('TSR device error: ' + stringifyError(error))) + })().catch((error) => { + if (!(error instanceof AbortError)) this.log.error('TSR device error: ' + stringifyError(error)) + }) } } // Removed: - for (const deviceId of this.devices.keys()) { + for (const [deviceId, oldDevice] of this.devices.entries()) { const newDevice = deviceOptions.get(deviceId) if (!newDevice || newDevice.disable) { + oldDevice.abortController.abort() await this._removeDevice(deviceId) this.reportRemovedDevice(deviceId) @@ -192,11 +210,7 @@ export class TSR { } private async _removeDevice(deviceId: TSRDeviceId): Promise { // Delete the sideloaded device, if any - const sideLoadedDevice = this.sideLoadedDevices.get(deviceId) - if (sideLoadedDevice) { - await sideLoadedDevice.close() - this.sideLoadedDevices.delete(deviceId) - } + await this._removeSideloadDevice(deviceId) // HACK: There are some scenarios in which this method will never return. // For example, when trying to remove a CasparCG device that has never connected. @@ -207,6 +221,15 @@ export class TSR { this.devices.delete(deviceId) this.deviceStatus.delete(deviceId) } + + private async _removeSideloadDevice(deviceId: TSRDeviceId) { + const sideLoadedDevice = this.sideLoadedDevices.get(deviceId) + if (sideLoadedDevice) { + await sideLoadedDevice.close() + this.sideLoadedDevices.delete(deviceId) + } + } + public refreshResourcesAndMetadata( cb: (deviceId: TSRDeviceId, resources: ResourceAny[], metadata: MetadataAny) => void ): void { @@ -290,7 +313,8 @@ export class TSR { if (status && device) { // Hack to get rid of warnings for UDP OSC devices, which always have an UNKNOWN status code. - const isOscUdp = device.type === DeviceType.OSC && device.options?.type === OSCDeviceType.UDP + const isOscUdp = + device.options.type === DeviceType.OSC && device.options.options?.type === OSCDeviceType.UDP const ok = isOscUdp ? true : status.statusCode === StatusCode.GOOD const message = status.messages?.join(', ') ?? '' this.send({