Skip to content

Commit

Permalink
Merge pull request #2508 from flowforge/agent-tunnel-backport
Browse files Browse the repository at this point in the history
Improve error handling around Device Agent tunnels - backport
  • Loading branch information
hardillb authored Jul 20, 2023
2 parents 39ecdce + bece171 commit 43f5097
Show file tree
Hide file tree
Showing 10 changed files with 561 additions and 69 deletions.
29 changes: 29 additions & 0 deletions docs/contribute/workflows/device-editor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
navTitle: Device Editor
---

# Enabling the device editor

```mermaid
sequenceDiagram
User->>FrontEnd: Clicks 'open editor' against device
FrontEnd->>+Forge: PUT /api/v1/devices/:id/editor { tunnel: 'enable' }
Forge->Forge: Generates <token>
Forge--)Device: Publishes command to establish connection with <token>
Device--)Forge: WS Connect /api/v1/devices/:id/editor/comms/:token
Forge->>-FrontEnd: Returns session identifier
FrontEnd->>FrontEnd: Opens /device/<id>/editor/
FrontEnd-->+Forge: Sends requests to /device/<id>/editor/**
Forge--)+Device: Request proxied over WebSocket
Device-->>Editor: Performs request on local Node-RED
Editor-->>Device: Returns response
Device-->>-Forge: Streams response back
Forge-->>-FrontEnd: Streams response back
User->>FrontEnd: User navigates away
FrontEnd-->Forge: Node-RED WebSocket closes
Note over Forge: if no active WebSockets for this device
Forge--)Device: Close WebSocket
```


114 changes: 78 additions & 36 deletions forge/ee/lib/deviceEditor/DeviceTunnelManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
/**
* A DeviceTunnel object keeps track of connections to a device.
* @typedef {Object} DeviceTunnel
* @property {numner} id - A unique identifier for this tunnel instance
* @property {string} deviceId - Device ID
* @property {number} counter - Number of active connections
* @property {number} nextRequestId - Next available request id
* @property {SocketStream} socket - Socket connection to device
* @property {Array<FastifyRequest>} requests - List of pending requests
* @property {Object} forwardedWS - List of forwarded websocket connections
Expand Down Expand Up @@ -46,6 +47,12 @@ class DeviceTunnelManager {
/** @type {ForgeApplication} Forge application (Fastify app) */
this.app = app
this.#tunnels = new Map()

this.app.addHook('onClose', async (_) => {
Object.keys(this.#tunnels).forEach(deviceId => {
this.closeTunnel(deviceId)
})
})
}

/**
Expand All @@ -58,7 +65,6 @@ class DeviceTunnelManager {
* @param {String} token Token to use for tunnel
* @see DeviceTunnelManager#initTunnel
* @see DeviceTunnelManager#closeTunnel
* @see DeviceTunnelManager#removeTunnel
*/
newTunnel (deviceId, token) {
const manager = this
Expand All @@ -67,7 +73,7 @@ class DeviceTunnelManager {
manager.closeTunnel(deviceId)

// create a new tunnel object & add to list
manager.#tunnels.set(deviceId, newTunnel(deviceId, token))
manager.#tunnels.set(deviceId, createNewTunnel(deviceId, token))

return !!manager.#getTunnel(deviceId)
}
Expand All @@ -93,36 +99,33 @@ class DeviceTunnelManager {
getTunnelStatus (deviceId) {
const exists = this.#tunnels.has(deviceId)
if (!exists) {
return null
return { enabled: false }
}
const url = this.getTunnelUrl(deviceId, true)
const url = this.getTunnelUrl(deviceId)
const enabled = this.isEnabled(deviceId)
const connected = this.isConnected(deviceId)
return { url, enabled, connected }
}

closeTunnel (deviceId) {
const tunnel = this.#getTunnel(deviceId)
if (tunnel?.socket) {
tunnel.socket.close()
tunnel.socket.removeAllListeners()
if (tunnel) {
tunnel.socket?.close()
// Close all of the editor websockets that were using this tunnel
Object.keys(tunnel?.forwardedWS).forEach(reqId => {
const wsClient = tunnel.forwardedWS[reqId]
wsClient.close()
})
}
this.removeTunnel(deviceId)
}

removeTunnel (deviceId) {
if (this.#tunnels.has(deviceId)) {
return this.#tunnels.delete(deviceId)
}
}

getTunnelUrl (deviceId, includeToken = false) {
getTunnelUrl (deviceId) {
const tunnel = this.#getTunnel(deviceId)
if (tunnel) {
if (includeToken) {
return `/api/v1/devices/${deviceId}/editor/proxy/?access_token=${tunnel.token}`
}
return `/api/v1/devices/${deviceId}/editor/proxy/`
return `/api/v1/devices/${deviceId}/editor/proxy/?access_token=${tunnel.token}`
}
return ''
}
Expand Down Expand Up @@ -152,15 +155,13 @@ class DeviceTunnelManager {
return false
}

// ensure tunnel is not already open
// Close any existing tunnel
if (tunnel.socket) {
tunnel.socket.close()
tunnel.socket.removeAllListeners()
}

tunnel.socket = inboundDeviceConnection.socket

// handle messages from device
// Handle messages sent from the device
tunnel.socket.on('message', msg => {
const response = JSON.parse(msg.toString())
if (response.id === undefined) {
Expand All @@ -178,22 +179,48 @@ class DeviceTunnelManager {
reply.send()
}
} else if (response.ws) {
// Send message to device editor websocket
const wsSocket = tunnel.forwardedWS[response.id]
wsSocket.send(response.body)
if (wsSocket) {
if (response.closed) {
// The runtime has closed this session's websocket on the device
// Pass that back to the editor so it knows something is up
if (wsSocket) {
wsSocket.close()
}
delete tunnel.forwardedWS[response.id]
} else {
// Send message to device editor websocket
wsSocket.send(response.body)
}
} else {
// This is a message for a editor we don't know about.
// This can happen with Device Agent <= 1.9.4 if multiple
// editors were opened in a single session and then one
// of them is closed. Older Agents don't know to disconnect
// their local comms link for the closed editor, so continue
// sending messages to everyone who was ever connected
}
} else {
// TODO: remove/change temp debug
console.warn('device editor websocket message has no reply')
}
})

tunnel.socket.on('close', () => {
manager.removeTunnel(deviceId)
// The ws connection from the device has closed.
delete tunnel.socket

// Close all of the editor websockets
for (const [id, wsSocket] of Object.entries(tunnel.forwardedWS)) {
wsSocket.close()
delete tunnel.forwardedWS[id]
}
this.app.log.info(`Device ${deviceId} tunnel closed. id:${tunnel.id}`)
})

/** @type {httpHandler} */
tunnel._handleHTTPGet = (request, reply) => {
const id = tunnel.counter++
const id = tunnel.nextRequestId++
tunnel.requests[id] = reply
tunnel.socket.send(JSON.stringify({
id,
Expand All @@ -208,7 +235,7 @@ class DeviceTunnelManager {
tunnel._handleHTTPGet(request, reply)
return
}
const requestId = tunnel.counter++
const requestId = tunnel.nextRequestId++
tunnel.requests[requestId] = reply
tunnel.socket.send(JSON.stringify({
id: requestId,
Expand All @@ -220,7 +247,8 @@ class DeviceTunnelManager {
}

tunnel._handleWS = (connection, request) => {
const requestId = tunnel.counter++
// A new editor websocket is connecting
const requestId = tunnel.nextRequestId++
tunnel.socket.send(JSON.stringify({
id: requestId,
ws: true,
Expand All @@ -230,22 +258,34 @@ class DeviceTunnelManager {
const wsToDevice = connection.socket
tunnel.forwardedWS[requestId] = wsToDevice

// forward messages from device to client
this.app.log.info(`Device ${deviceId} tunnel id:${tunnel.id} - new editor connection req:${requestId} `)

wsToDevice.on('message', msg => {
// Forward messages sent by the editor down to the device
// console.log(`[${tunnel.id}] [${requestId}] E>R`, msg.toString())
tunnel.socket.send(JSON.stringify({
id: requestId,
ws: true,
body: msg.toString()
}))
})

connection.on('close', () => {
if (tunnel.forwardedWS[requestId]) {
tunnel.forwardedWS[requestId].close()
wsToDevice.on('close', msg => {
this.app.log.info(`Device ${deviceId} tunnel id:${tunnel.id} - editor connection closed req:${requestId} `)
// The editor has closed its websocket. Send notification to the
// device so it can close its corresponing connection
// console.log(`[${tunnel.id}] [${requestId}] E>R closed`)
if (tunnel.forwardedWS[requestId] && tunnel.socket) {
// console.log(`[${tunnel.id}] [${requestId}] E>R closed - notifying the device`)
tunnel.socket.send(JSON.stringify({
id: requestId,
ws: true,
closed: true
}))
delete tunnel.forwardedWS[requestId]
}
delete tunnel.forwardedWS[requestId]
})
}
this.app.log.info(`Device ${deviceId} tunnel connected. id:${tunnel.id}`)
return true
}

Expand Down Expand Up @@ -304,19 +344,21 @@ class DeviceTunnelManager {
}
}

let tunnelCounter = 0
/**
* Create new tunnel
* @param {String} deviceId Device ID
* @param {String} token Editor access token
* @returns {DeviceTunnel}
* @memberof DeviceTunnelManager
* @static
* @method newTunnel
* @method createNewTunnel
*/
function newTunnel (deviceId, token) {
function createNewTunnel (deviceId, token) {
const tunnel = {
id: ++tunnelCounter,
deviceId,
counter: 0,
nextRequestId: 1,
socket: null,
requests: {},
forwardedWS: {},
Expand Down
Loading

0 comments on commit 43f5097

Please sign in to comment.