Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make bridge and device initialization more robust #176

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions apps/app/src/electron/bridgeHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import { AnalogInput } from '../models/project/AnalogInput'
export const { version: CURRENT_VERSION }: { version: string } = require('../../package.json')
export const SERVER_PORT = 5400

type AnyBridgeConnection = WebsocketBridgeConnection | LocalBridgeConnection

/** This handles connected bridges */
export class BridgeHandler {
server: WebsocketServer
Expand All @@ -48,7 +46,7 @@ export class BridgeHandler {
}
> = new Map()
private internalBridge: LocalBridgeConnection | null = null
private connectedBridges: Array<AnyBridgeConnection> = []
private connectedBridges: Map<BridgeId, AbstractBridgeConnection> = new Map()
private mappings: Mappings = {}
private timelines: { [timelineId: string]: TSRTimeline } = {}
private settings: Map<BridgeId, Bridge['settings']> = new Map()
Expand All @@ -73,6 +71,18 @@ export class BridgeHandler {
...callbacks,
getMappings: () => this.mappings,
getTimelines: () => this.timelines,
onInitialized: (connection) => {
if (connection.bridgeId == null) return
this.connectedBridges.set(connection.bridgeId, connection)
},
onDisconnected: (connection) => {
if (connection.bridgeId == null) return
const currentConnection = this.connectedBridges.get(connection.bridgeId)
if (currentConnection === connection) {
// delete only if that's the same connection, otherwise a new one might have been initiated before this one fully closed
this.connectedBridges.delete(connection.bridgeId)
}
},
}
this.server = new WebsocketServer(this.log, SERVER_PORT, (connection: WebsocketConnection) => {
// On connection:
Expand All @@ -89,10 +99,9 @@ export class BridgeHandler {
for (const [bridgeId, outgoing] of this.outgoingBridges.entries()) {
if (outgoing.connection?.connectionId === connection.connectionId) {
bridge.bridgeId = bridgeId
this.connectedBridges.set(bridgeId, bridge)
}
}

this.connectedBridges.push(bridge)
})

this.server.on('close', () => {
Expand All @@ -108,8 +117,8 @@ export class BridgeHandler {
this.reconnectToBridges()
}, 1000)
}
getBridgeConnection(bridgeId: BridgeId): AnyBridgeConnection | undefined {
return this.connectedBridges.find((b) => b.bridgeId === bridgeId)
getBridgeConnection(bridgeId: BridgeId): AbstractBridgeConnection | undefined {
return this.connectedBridges.get(bridgeId)
}
async onClose(): Promise<void> {
if (this.internalBridge) {
Expand All @@ -129,18 +138,13 @@ export class BridgeHandler {
this.storage,
this.connectionCallbacks
)
this.connectedBridges.push(this.internalBridge)
this.connectedBridges.set(INTERNAL_BRIDGE_ID, this.internalBridge)
}
} else {
if (this.internalBridge) {
this.session.updateBridgeStatus(INTERNAL_BRIDGE_ID, null)
const bridgeIndex = this.connectedBridges.findIndex(
(connectedBridge) => connectedBridge === this.internalBridge
)
if (bridgeIndex >= 0) {
this.connectedBridges.splice(bridgeIndex, 1)
}
const internalBridge = this.internalBridge
this.connectedBridges.delete(INTERNAL_BRIDGE_ID)
this.internalBridge = null
internalBridge.destroy().catch(this.log.error)
}
Expand Down Expand Up @@ -218,7 +222,7 @@ export class BridgeHandler {
if (!_.isEqual(this.mappings, mappings)) {
this.mappings = mappings

for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.setMappings(mappings)
}
}
Expand All @@ -228,20 +232,20 @@ export class BridgeHandler {
if (timeline) {
this.timelines[timelineId] = timeline

for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.addTimeline(timelineId, timeline)
}
} else {
delete this.timelines[timelineId]

for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.removeTimeline(timelineId)
}
}
}
}
updateSettings(bridgeId: BridgeId, settings: Bridge['settings']): void {
const bridgeConnection = this.connectedBridges.find((bc) => bc.bridgeId === bridgeId)
const bridgeConnection = this.connectedBridges.get(bridgeId)
if (bridgeConnection) {
bridgeConnection.setSettings(settings)
}
Expand Down Expand Up @@ -303,12 +307,12 @@ export class BridgeHandler {
modified: number
}[]
): void {
for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.updateDatastore(updates)
}
}
refreshResources(): void {
for (const bridgeConnection of this.connectedBridges) {
for (const bridgeConnection of this.connectedBridges.values()) {
bridgeConnection.refreshResources()
}
}
Expand All @@ -323,6 +327,8 @@ interface BridgeHandlerCallbacks {
interface BridgeConnectionCallbacks extends BridgeHandlerCallbacks {
getMappings: () => Mappings
getTimelines: () => { [timelineId: string]: TSRTimeline }
onInitialized: (connection: AbstractBridgeConnection) => void
onDisconnected: (connection: AbstractBridgeConnection) => void
}

abstract class AbstractBridgeConnection {
Expand Down Expand Up @@ -412,6 +418,7 @@ abstract class AbstractBridgeConnection {
} else if (this.bridgeId !== id) {
throw new Error(`bridgeId ID mismatch: "${this.bridgeId}" vs "${id}"`)
}
this.callbacks.onInitialized(this)

if (version !== CURRENT_VERSION) {
this.callbacks.onVersionMismatch(id, version, CURRENT_VERSION)
Expand Down Expand Up @@ -625,6 +632,7 @@ export class WebsocketBridgeConnection extends AbstractBridgeConnection {

this.session.updateBridgeStatus(this.bridgeId, status)
}
this.callbacks.onDisconnected(this)
}
})
this.connection.on('message', this.handleMessage.bind(this))
Expand Down
58 changes: 41 additions & 17 deletions shared/packages/tsr-bridge/src/TSR.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import _ from 'lodash'
import { Conductor, ConductorOptions, DeviceOptionsAny, DeviceType, OSCDeviceType } from 'timeline-state-resolver'
import {
AbortError,
Conductor,
ConductorOptions,
DeviceOptionsAny,
DeviceType,
OSCDeviceType,
} from 'timeline-state-resolver'
import { MetadataAny, ResourceAny, TSRDeviceId, unprotectString } from '@shared/models'
import { BridgeAPI, LoggerLike } from '@shared/api'
import { CasparCGSideload } from './sideload/CasparCG'
Expand All @@ -18,7 +25,7 @@ export class TSR {
public newConnection = false
public conductor: Conductor
public send: (message: BridgeAPI.FromBridge.Any) => void
private devices = new Map<TSRDeviceId, DeviceOptionsAny>()
private devices = new Map<TSRDeviceId, { abortController: AbortController; options: DeviceOptionsAny }>()

private sideLoadedDevices = new Map<TSRDeviceId, SideLoadDevice>()

Expand Down Expand Up @@ -106,17 +113,21 @@ export class TSR {
private async _updateDevices(): Promise<void> {
// Added/updated:
const deviceOptions = this.deviceOptions
for (const [deviceId, newDevice] of deviceOptions.entries()) {
if (newDevice.disable) continue
for (const [deviceId, newDeviceOptions] of deviceOptions.entries()) {
if (newDeviceOptions.disable) continue

const existingDevice = this.devices.get(deviceId)

if (!existingDevice || !_.isEqual(existingDevice, newDevice)) {
if (!existingDevice || !_.isEqual(existingDevice.options, newDeviceOptions)) {
if (existingDevice) {
existingDevice.abortController.abort()
await this.conductor.removeDevice(unprotectString(deviceId))
}
await this._removeSideloadDevice(deviceId)

this.devices.set(deviceId, newDevice)
const abortController = new AbortController()

this.devices.set(deviceId, { options: newDeviceOptions, abortController })
this.onDeviceStatus(deviceId, {
statusCode: StatusCode.UNKNOWN,
messages: ['Initializing'],
Expand All @@ -125,10 +136,12 @@ export class TSR {

// Run async so as not to block other devices from being processed.
;(async () => {
this.sideLoadDevice(deviceId, newDevice)
this.sideLoadDevice(deviceId, newDeviceOptions)

// Create the device, but don't initialize it:
const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDevice)
const devicePr = this.conductor.createDevice(unprotectString(deviceId), newDeviceOptions, {
signal: abortController.signal,
})

this.onDeviceStatus(deviceId, {
active: true,
Expand Down Expand Up @@ -171,16 +184,21 @@ export class TSR {
})

// now initialize it
await this.conductor.initDevice(unprotectString(deviceId), newDevice)
await this.conductor.initDevice(unprotectString(deviceId), newDeviceOptions, undefined, {
signal: abortController.signal,
})

this.onDeviceStatus(deviceId, await device.device.getStatus())
})().catch((error) => this.log.error('TSR device error: ' + stringifyError(error)))
})().catch((error) => {
if (!(error instanceof AbortError)) this.log.error('TSR device error: ' + stringifyError(error))
})
}
}
// Removed:
for (const deviceId of this.devices.keys()) {
for (const [deviceId, oldDevice] of this.devices.entries()) {
const newDevice = deviceOptions.get(deviceId)
if (!newDevice || newDevice.disable) {
oldDevice.abortController.abort()
await this._removeDevice(deviceId)

this.reportRemovedDevice(deviceId)
Expand All @@ -192,11 +210,7 @@ export class TSR {
}
private async _removeDevice(deviceId: TSRDeviceId): Promise<void> {
// Delete the sideloaded device, if any
const sideLoadedDevice = this.sideLoadedDevices.get(deviceId)
if (sideLoadedDevice) {
await sideLoadedDevice.close()
this.sideLoadedDevices.delete(deviceId)
}
await this._removeSideloadDevice(deviceId)

// HACK: There are some scenarios in which this method will never return.
// For example, when trying to remove a CasparCG device that has never connected.
Expand All @@ -207,6 +221,15 @@ export class TSR {
this.devices.delete(deviceId)
this.deviceStatus.delete(deviceId)
}

private async _removeSideloadDevice(deviceId: TSRDeviceId) {
const sideLoadedDevice = this.sideLoadedDevices.get(deviceId)
if (sideLoadedDevice) {
await sideLoadedDevice.close()
this.sideLoadedDevices.delete(deviceId)
}
}

public refreshResourcesAndMetadata(
cb: (deviceId: TSRDeviceId, resources: ResourceAny[], metadata: MetadataAny) => void
): void {
Expand Down Expand Up @@ -290,7 +313,8 @@ export class TSR {

if (status && device) {
// Hack to get rid of warnings for UDP OSC devices, which always have an UNKNOWN status code.
const isOscUdp = device.type === DeviceType.OSC && device.options?.type === OSCDeviceType.UDP
const isOscUdp =
device.options.type === DeviceType.OSC && device.options.options?.type === OSCDeviceType.UDP
const ok = isOscUdp ? true : status.statusCode === StatusCode.GOOD
const message = status.messages?.join(', ') ?? ''
this.send({
Expand Down
Loading