Skip to content

Commit

Permalink
feat: allow sequential executionMode to paralelize multiple queues of…
Browse files Browse the repository at this point in the history
… Commands
  • Loading branch information
jstarpl committed Sep 5, 2024
1 parent 81af9f7 commit 84a53cd
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 15 deletions.
1 change: 1 addition & 0 deletions packages/timeline-state-resolver-types/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,6 @@ export interface ActionExecutionResult<ResultData = undefined> {

export enum ActionExecutionResultCode {
Error = 'ERROR',
IgnoredNotRelevant = 'IGNORED',
Ok = 'OK',
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ describe('HTTP-Send', () => {
content: content,
layer: 'layer0',
},
queueId: undefined,
},
])
await Promise.all(commands.map(async (c) => device.sendCommand(c)))
Expand All @@ -362,6 +363,7 @@ describe('HTTP-Send', () => {
content: content,
layer: 'layer0',
},
queueId: undefined,
},
])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export class HTTPSendDevice extends Device<HTTPSendOptions, HttpSendDeviceState,
content: newLayer.content as HTTPSendCommandContent,
layer: layerKey,
},
queueId: (newLayer.content as HTTPSendCommandContent)?.queueId,
})
} else {
// changed?
Expand All @@ -149,6 +150,7 @@ export class HTTPSendDevice extends Device<HTTPSendOptions, HttpSendDeviceState,
content: newLayer.content as HTTPSendCommandContent,
layer: layerKey,
},
queueId: (newLayer.content as HTTPSendCommandContent)?.queueId,
})
}
}
Expand All @@ -162,6 +164,7 @@ export class HTTPSendDevice extends Device<HTTPSendOptions, HttpSendDeviceState,
timelineObjId: oldLayer.id,
context: `removed: ${oldLayer.id}`,
command: { commandName: 'removed', content: oldLayer.content as HTTPSendCommandContent, layer: layerKey },
queueId: (oldLayer.content as HTTPSendCommandContent)?.queueId,
})
}
})
Expand Down Expand Up @@ -194,7 +197,7 @@ export class HTTPSendDevice extends Device<HTTPSendOptions, HttpSendDeviceState,
const trackedHash = this.trackedState.get(command.layer)
if (commandHash !== trackedHash)
return {
result: ActionExecutionResultCode.Error,
result: ActionExecutionResultCode.IgnoredNotRelevant,
} // command is no longer relevant to state
}
if (this._terminated) {
Expand Down Expand Up @@ -254,7 +257,7 @@ export class HTTPSendDevice extends Device<HTTPSendOptions, HttpSendDeviceState,

const response = await httpReq(command.content.url, options)

if (response.statusCode === 200) {
if (response.statusCode >= 200 && response.statusCode < 300) {
this.context.logger.debug(
`HTTPSend: ${command.content.type}: Good statuscode response on url "${command.content.url}": ${response.statusCode} (${context})`
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,10 @@ export class QuantelManager extends EventEmitter {
})
}
public clearAllWaitWithPort(portId: string) {
if (!this._waitWithPorts[portId]) {
_.each(this._waitWithPorts[portId], (fcn) => {
fcn(true)
})
}
if (!this._waitWithPorts[portId]) return
_.each(this._waitWithPorts[portId], (fcn) => {
fcn(true)
})
}
/**
* Returns true if the wait was cleared from someone else
Expand Down
23 changes: 15 additions & 8 deletions packages/timeline-state-resolver/src/service/commandExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as _ from 'underscore'
import { BaseDeviceAPI, CommandWithContext } from './device'
import { Measurement } from './measure'
import { StateHandlerContext } from './stateHandler'
Expand Down Expand Up @@ -50,15 +51,21 @@ export class CommandExecutor<DeviceState, Command extends CommandWithContext> {
): Promise<void> {
const start = Date.now() // note - would be better to use monotonic time here but BigInt's are annoying

for (const command of commands || []) {
const timeToWait = totalTime - (Date.now() - start)
if (timeToWait > 0) await wait(timeToWait)
const commandQueues = _.groupBy(commands || [], (command) => command.queueId ?? '$$default')

measurement?.executeCommand(command)
await this.sendCommand(command).catch((e) => {
this.logger.error('Error while executing command', e)
await Promise.allSettled(
Object.values<Command[]>(commandQueues).map(async (commandsInQueue): Promise<void> => {
for (const command of commandsInQueue) {
const timeToWait = totalTime - (Date.now() - start)
if (timeToWait > 0) await wait(timeToWait)

measurement?.executeCommand(command)
await this.sendCommand(command).catch((e) => {
this.logger.error('Error while executing command', e)
})
measurement?.finishedCommandExecution(command)
}
})
measurement?.finishedCommandExecution(command)
}
)
}
}
2 changes: 2 additions & 0 deletions packages/timeline-state-resolver/src/service/device.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export type CommandWithContext = {
timelineObjId: string
/** this command is to be executed x ms _before_ the scheduled time */
preliminary?: number
/** commands with different queueId's can be executed in paralel in sequential mode */
queueId?: string
}

/**
Expand Down

0 comments on commit 84a53cd

Please sign in to comment.