Skip to content

Commit

Permalink
fix: make bridge and device initialization more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
ianshade committed Oct 26, 2023
1 parent d946c76 commit b5c8c1c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 32 deletions.
48 changes: 28 additions & 20 deletions apps/app/src/electron/bridgeHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import { AnalogInput } from '../models/project/AnalogInput'
export const { version: CURRENT_VERSION }: { version: string } = require('../../package.json')
export const SERVER_PORT = 5400

type AnyBridgeConnection = WebsocketBridgeConnection | LocalBridgeConnection

/** This handles connected bridges */
export class BridgeHandler {
server: WebsocketServer
Expand All @@ -48,7 +46,7 @@ export class BridgeHandler {
}
> = new Map()
private internalBridge: LocalBridgeConnection | null = null
private connectedBridges: Array<AnyBridgeConnection> = []
private connectedBridges: Map<BridgeId, AbstractBridgeConnection> = new Map()
private mappings: Mappings = {}
private timelines: { [timelineId: string]: TSRTimeline } = {}
private settings: Map<BridgeId, Bridge['settings']> = new Map()
Expand All @@ -73,6 +71,18 @@ export class BridgeHandler {
...callbacks,
getMappings: () => this.mappings,
getTimelines: () => this.timelines,
onInitialized: (connection) => {
if (connection.bridgeId == null) return
this.connectedBridges.set(connection.bridgeId, connection)
},
onDisconnected: (connection) => {
if (connection.bridgeId == null) return
const currentConnection = this.connectedBridges.get(connection.bridgeId)
if (currentConnection === connection) {
// delete only if that's the same connection, otherwise a new one might have been initiated before this one fully closed
this.connectedBridges.delete(connection.bridgeId)
}
},
}
this.server = new WebsocketServer(this.log, SERVER_PORT, (connection: WebsocketConnection) => {
// On connection:
Expand All @@ -89,10 +99,9 @@ export class BridgeHandler {
for (const [bridgeId, outgoing] of this.outgoingBridges.entries()) {
if (outgoing.connection?.connectionId === connection.connectionId) {
bridge.bridgeId = bridgeId
this.connectedBridges.set(bridgeId, bridge)
}
}

this.connectedBridges.push(bridge)
})

this.server.on('close', () => {
Expand All @@ -108,8 +117,8 @@ export class BridgeHandler {
this.reconnectToBridges()
}, 1000)
}
getBridgeConnection(bridgeId: BridgeId): AnyBridgeConnection | undefined {
return this.connectedBridges.find((b) => b.bridgeId === bridgeId)
getBridgeConnection(bridgeId: BridgeId): AbstractBridgeConnection | undefined {
return this.connectedBridges.get(bridgeId)
}
async onClose(): Promise<void> {
if (this.internalBridge) {
Expand All @@ -129,18 +138,13 @@ export class BridgeHandler {
this.storage,
this.connectionCallbacks
)
this.connectedBridges.push(this.internalBridge)
this.connectedBridges.set(INTERNAL_BRIDGE_ID, this.internalBridge)
}
} else {
if (this.internalBridge) {
this.session.updateBridgeStatus(INTERNAL_BRIDGE_ID, null)
const bridgeIndex = this.connectedBridges.findIndex(
(connectedBridge) => connectedBridge === this.internalBridge
)
if (bridgeIndex >= 0) {
this.connectedBridges.splice(bridgeIndex, 1)
}
const internalBridge = this.internalBridge
this.connectedBridges.delete(INTERNAL_BRIDGE_ID)
this.internalBridge = null
internalBridge.destroy().catch(this.log.error)
}
Expand Down Expand Up @@ -218,7 +222,7 @@ export class BridgeHandler {
if (!_.isEqual(this.mappings, mappings)) {
this.mappings = mappings

for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.setMappings(mappings)
}
}
Expand All @@ -228,20 +232,20 @@ export class BridgeHandler {
if (timeline) {
this.timelines[timelineId] = timeline

for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.addTimeline(timelineId, timeline)
}
} else {
delete this.timelines[timelineId]

for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.removeTimeline(timelineId)
}
}
}
}
updateSettings(bridgeId: BridgeId, settings: Bridge['settings']): void {
const bridgeConnection = this.connectedBridges.find((bc) => bc.bridgeId === bridgeId)
const bridgeConnection = this.connectedBridges.get(bridgeId)
if (bridgeConnection) {
bridgeConnection.setSettings(settings)
}
Expand Down Expand Up @@ -303,12 +307,12 @@ export class BridgeHandler {
modified: number
}[]
): void {
for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.updateDatastore(updates)
}
}
refreshResources(): void {
for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.refreshResources()
}
}
Expand All @@ -323,6 +327,8 @@ interface BridgeHandlerCallbacks {
interface BridgeConnectionCallbacks extends BridgeHandlerCallbacks {
getMappings: () => Mappings
getTimelines: () => { [timelineId: string]: TSRTimeline }
onInitialized: (connection: AbstractBridgeConnection) => void
onDisconnected: (connection: AbstractBridgeConnection) => void
}

abstract class AbstractBridgeConnection {
Expand Down Expand Up @@ -412,6 +418,7 @@ abstract class AbstractBridgeConnection {
} else if (this.bridgeId !== id) {
throw new Error(`bridgeId ID mismatch: "${this.bridgeId}" vs "${id}"`)
}
this.callbacks.onInitialized(this)

if (version !== CURRENT_VERSION) {
this.callbacks.onVersionMismatch(id, version, CURRENT_VERSION)
Expand Down Expand Up @@ -625,6 +632,7 @@ export class WebsocketBridgeConnection extends AbstractBridgeConnection {

this.session.updateBridgeStatus(this.bridgeId, status)
}
this.callbacks.onDisconnected(this)
}
})
this.connection.on('message', this.handleMessage.bind(this))
Expand Down
47 changes: 35 additions & 12 deletions shared/packages/tsr-bridge/src/TSR.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import _ from 'lodash'
import { Conductor, ConductorOptions, DeviceOptionsAny, DeviceType, OSCDeviceType } from 'timeline-state-resolver'
import {
AbortError,
Conductor,
ConductorOptions,
DeviceOptionsAny,
DeviceType,
OSCDeviceType,
} from 'timeline-state-resolver'
import { MetadataAny, ResourceAny, TSRDeviceId, unprotectString } from '@shared/models'
import { BridgeAPI, LoggerLike } from '@shared/api'
import { CasparCGSideload } from './sideload/CasparCG'
Expand All @@ -18,7 +25,7 @@ export class TSR {
public newConnection = false
public conductor: Conductor
public send: (message: BridgeAPI.FromBridge.Any) => void
private devices = new Map<TSRDeviceId, DeviceOptionsAny>()
private devices = new Map<TSRDeviceId, DeviceOptionsAny & { abortController: AbortController }>()

private sideLoadedDevices = new Map<TSRDeviceId, SideLoadDevice>()

Expand Down Expand Up @@ -113,10 +120,14 @@ export class TSR {

if (!existingDevice || !_.isEqual(existingDevice, newDevice)) {
if (existingDevice) {
existingDevice.abortController.abort()
await this.conductor.removeDevice(unprotectString(deviceId))
}
await this._removeSideloadDevice(deviceId)

this.devices.set(deviceId, newDevice)
const abortController = new AbortController()

this.devices.set(deviceId, { ...newDevice, abortController })
this.onDeviceStatus(deviceId, {
statusCode: StatusCode.UNKNOWN,
messages: ['Initializing'],
Expand All @@ -128,7 +139,9 @@ export class TSR {
this.sideLoadDevice(deviceId, newDevice)

// Create the device, but don't initialize it:
const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDevice)
const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDevice, {
signal: abortController.signal,
})

this.onDeviceStatus(deviceId, {
active: true,
Expand Down Expand Up @@ -171,16 +184,21 @@ export class TSR {
})

// now initialize it
await this.conductor.initDevice(unprotectString(deviceId), newDevice)
await this.conductor.initDevice(unprotectString(deviceId), newDevice, undefined, {
signal: abortController.signal,
})

this.onDeviceStatus(deviceId, await device.device.getStatus())
})().catch((error) => this.log.error('TSR device error: ' + stringifyError(error)))
})().catch((error) => {
if (!(error instanceof AbortError)) this.log.error('TSR device error: ' + stringifyError(error))
})
}
}
// Removed:
for (const deviceId of this.devices.keys()) {
for (const [deviceId, oldDevice] of this.devices.entries()) {
const newDevice = deviceOptions.get(deviceId)
if (!newDevice || newDevice.disable) {
oldDevice.abortController.abort()
await this._removeDevice(deviceId)

this.reportRemovedDevice(deviceId)
Expand All @@ -192,11 +210,7 @@ export class TSR {
}
private async _removeDevice(deviceId: TSRDeviceId): Promise<void> {
// Delete the sideloaded device, if any
const sideLoadedDevice = this.sideLoadedDevices.get(deviceId)
if (sideLoadedDevice) {
await sideLoadedDevice.close()
this.sideLoadedDevices.delete(deviceId)
}
await this._removeSideloadDevice(deviceId)

// HACK: There are some scenarios in which this method will never return.
// For example, when trying to remove a CasparCG device that has never connected.
Expand All @@ -207,6 +221,15 @@ export class TSR {
this.devices.delete(deviceId)
this.deviceStatus.delete(deviceId)
}

private async _removeSideloadDevice(deviceId: TSRDeviceId) {
const sideLoadedDevice = this.sideLoadedDevices.get(deviceId)
if (sideLoadedDevice) {
await sideLoadedDevice.close()
this.sideLoadedDevices.delete(deviceId)
}
}

public refreshResourcesAndMetadata(
cb: (deviceId: TSRDeviceId, resources: ResourceAny[], metadata: MetadataAny) => void
): void {
Expand Down

0 comments on commit b5c8c1c

Please sign in to comment.