From 19990168f96d33c3d88f1c0c9ed303efc4cf15a0 Mon Sep 17 00:00:00 2001 From: csmig Date: Tue, 11 Jun 2024 05:19:00 -0400 Subject: [PATCH 1/4] feat: schedule scan based on queue events --- lib/scan.js | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/lib/scan.js b/lib/scan.js index 0b96eea..6cc9a6b 100644 --- a/lib/scan.js +++ b/lib/scan.js @@ -1,6 +1,7 @@ import { logger } from './logger.js' import { options } from './args.js' import { queue as parseQueue} from './parse.js' +import { cargoQueue } from './cargo.js' import { serializeError } from 'serialize-error' import fg from 'fast-glob' import lineByLine from 'n-readlines' @@ -11,11 +12,37 @@ const component = 'scan' const historySet = new Set() // in memory history set let isWriteScheduled = false // flag to indicate if there is pending files to write to the history file let timeoutId // id of the active setTimeout +let isParseQueueActive = false +let isCargoQueueActive = false + +function testScheduleNextScan() { + if (!isParseQueueActive && !isCargoQueueActive && !options.oneShot) { + scheduleNextScan() + } +} /** * Utility function that calls initHistory() and startScanner() */ function initScanner() { + parseQueue.on('task_queued', () => { + logger.info({ component, message: `parse_queue_task_queued` }) + isParseQueueActive = true + }) + parseQueue.on('drain', () => { + logger.info({ component, message: `parse_queue_drained` }) + isParseQueueActive = false + testScheduleNextScan() + }) + cargoQueue.on('task_queued', () => { + logger.info({ component, message: `cargo_queue_task_queued` }) + isCargoQueueActive = true + }) + cargoQueue.on('drain', () => { + logger.info({ component, message: `cargo_queue_drained` }) + isCargoQueueActive = false + testScheduleNextScan() + }) initHistory() startScanner() Alarm.on('alarmRaised', onAlarmRaised) @@ -53,18 +80,11 @@ async function startScanner() { //Remove stale files: those in historySet but not found in the current scan removeStaleFiles(discoveredFiles) logger.info({ component, message: `scan ended`, path: options.path }) + testScheduleNextScan() } catch (e) { logger.error({ component, error: serializeError(e) }) } - finally { - if (!options.oneShot) { - scheduleNextScan() - } - else { - logger.info({ component, message: `one-shot scan completed`, path: options.path }) - } - } } /** From 5501c15df4fa91fb6f650ca6a1312a9adc114af7 Mon Sep 17 00:00:00 2001 From: csmig Date: Wed, 12 Jun 2024 08:34:34 -0400 Subject: [PATCH 2/4] pause/resume queues on alarms --- lib/scan.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/scan.js b/lib/scan.js index 6cc9a6b..377daab 100644 --- a/lib/scan.js +++ b/lib/scan.js @@ -350,7 +350,8 @@ function onAlarmRaised(alarmType) { message: `handling raised alarm`, alarmType }) - cancelNextScan() + parseQueue.pause() + cargoQueue.pause() } /** @@ -364,7 +365,8 @@ function onAlarmLowered(alarmType) { message: `handling lowered alarm`, alarmType }) - startScanner() + parseQueue.resume() + cargoQueue.resume() } From ae8fd2d61c441f936538324f9baf2196e52d142a Mon Sep 17 00:00:00 2001 From: csmig Date: Wed, 12 Jun 2024 14:17:13 -0400 Subject: [PATCH 3/4] event handlers fire before call to testScheduleNextScan(); remove unused function --- lib/scan.js | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/lib/scan.js b/lib/scan.js index 377daab..a7ff9e2 100644 --- a/lib/scan.js +++ b/lib/scan.js @@ -80,7 +80,8 @@ async function startScanner() { //Remove stale files: those in historySet but not found in the current scan removeStaleFiles(discoveredFiles) logger.info({ component, message: `scan ended`, path: options.path }) - testScheduleNextScan() + // allow queue event handlers to fire before calling testScheduleNextScan() + setTimeout(testScheduleNextScan, 0) } catch (e) { logger.error({ component, error: serializeError(e) }) @@ -117,19 +118,6 @@ function scheduleNextScan() { }) } -/** - * Cancels the next scan and logs. - * References options properties {path}. - */ -function cancelNextScan() { - clearTimeout(timeoutId) - logger.info({ - component, - message: `scan cancelled`, - path: options.path - }) -} - /** * Returns immediately if options.historyFile is falsy. * Initializes the history Set by reading it from a file and adding each line to the history set. From 51a89d86ce8d62df0b424f8c2cab725b5a5dd29a Mon Sep 17 00:00:00 2001 From: csmig Date: Wed, 12 Jun 2024 14:55:55 -0400 Subject: [PATCH 4/4] moved handlers into separate function and added commentary --- lib/scan.js | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/lib/scan.js b/lib/scan.js index a7ff9e2..35dc817 100644 --- a/lib/scan.js +++ b/lib/scan.js @@ -11,38 +11,50 @@ import Alarm from './alarm.js' const component = 'scan' const historySet = new Set() // in memory history set let isWriteScheduled = false // flag to indicate if there is pending files to write to the history file -let timeoutId // id of the active setTimeout -let isParseQueueActive = false -let isCargoQueueActive = false +let isParseQueueActive = false // flag to indicate if parseQueue has pending work +let isCargoQueueActive = false // flag to indicate if cargoQueue has pending work -function testScheduleNextScan() { +/** + * Schedules the next scan if we're not in oneShot operation and the queues are idle + */ +function tryScheduleNextScan() { if (!isParseQueueActive && !isCargoQueueActive && !options.oneShot) { scheduleNextScan() } } /** - * Utility function that calls initHistory() and startScanner() + * - Attaches handlers for task_queued and drain events on the queues + * - Sets the flags that track if the queues have pending work + * - on drain events, try to schedule the next scan */ -function initScanner() { +function initQueueEvents() { parseQueue.on('task_queued', () => { - logger.info({ component, message: `parse_queue_task_queued` }) + logger.verbose({ component, message: `handling parseQueue event`, event: 'task_queued' }) isParseQueueActive = true }) parseQueue.on('drain', () => { - logger.info({ component, message: `parse_queue_drained` }) + logger.verbose({ component, message: `handling parseQueue event`, event: 'drained' }) isParseQueueActive = false - testScheduleNextScan() + tryScheduleNextScan() }) cargoQueue.on('task_queued', () => { - logger.info({ component, message: `cargo_queue_task_queued` }) + logger.verbose({ component, message: `handling cargoQueue event`, event: 'task_queued' }) isCargoQueueActive = true }) cargoQueue.on('drain', () => { - logger.info({ component, message: `cargo_queue_drained` }) + logger.verbose({ component, message: `handling cargoQueue event`, event: 'drained' }) isCargoQueueActive = false - testScheduleNextScan() + tryScheduleNextScan() }) +} + +/** + * Utility function that calls initQueueEvents(), initHistory() and startScanner() + * Attaches handlers for alarmRaised and alarmLowered + */ +function initScanner() { + initQueueEvents() initHistory() startScanner() Alarm.on('alarmRaised', onAlarmRaised) @@ -80,8 +92,8 @@ async function startScanner() { //Remove stale files: those in historySet but not found in the current scan removeStaleFiles(discoveredFiles) logger.info({ component, message: `scan ended`, path: options.path }) - // allow queue event handlers to fire before calling testScheduleNextScan() - setTimeout(testScheduleNextScan, 0) + // allow queue event handlers to execute before calling tryScheduleNextScan() + setTimeout(tryScheduleNextScan, 0) } catch (e) { logger.error({ component, error: serializeError(e) }) @@ -104,7 +116,7 @@ function removeStaleFiles(currentFilesSet){ * References options properties {path, scanInterval}. */ function scheduleNextScan() { - timeoutId = setTimeout(() => { + setTimeout(() => { startScanner().catch(e => { logger.error({ component, error: serializeError(e) }) })