Skip to content

Commit

Permalink
last commit for the better-queue solution
Browse files Browse the repository at this point in the history
  • Loading branch information
Matte22 committed Feb 13, 2024
1 parent f2c630f commit 6e8cdb0
Show file tree
Hide file tree
Showing 11 changed files with 71,831 additions and 145 deletions.
6 changes: 3 additions & 3 deletions lib/args.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ program
.option('--add-existing', 'For `--mode events`, existing files in the path will generate an `add` event (`WATCHER_ADD_EXISTING=1`). Ignored if `--mode scan`, negate with `--no-add-existing`.', getBoolean('WATCHER_ADD_EXISTING', false))
.option('--no-add-existing', 'Ignore existing files in the watched path (`WATCHER_ADD_EXISTING=0`).')
.option('--cargo-delay <ms>', 'Milliseconds to delay processing the queue (`WATCHER_CARGO_DELAY`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_CARGO_DELAY) ?? 2000)
.option('--history-cargo-delay <ms>', 'Milliseconds to delay processing the history file write queue (`WATCHER_HISTORY_CARGO_DELAY`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_CARGO_DELAY) ?? 2000)
.option('--cargo-size <number>', 'Maximum queue size that triggers processing (`WATCHER_CARGO_SIZE`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_CARGO_SIZE) ?? 25)
.option('--history-cargo-size <number>', 'Maximum queue size that triggers writing to the history file(`WATCHER_HISTORY_CARGO_SIZE`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_CARGO_SIZE) ?? 10)
.option('--history-cargo-delay <ms>', 'Milliseconds to delay processing the history file write queue (`WATCHER_HISTORY_CARGO_DELAY`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_CARGO_DELAY) ?? 290000)
.option('--cargo-size <number>', 'Maximum queue size that triggers processing (`WATCHER_CARGO_SIZE`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_CARGO_SIZE) ?? 10)
.option('--history-cargo-size <number>', 'Maximum queue size that triggers writing to the history file(`WATCHER_HISTORY_CARGO_SIZE`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_CARGO_SIZE) ?? parseIntegerEnv(pe.WATCHER_CARGO_SIZE) ?? 10)
.option('--create-objects', 'Create Assets or STIG Assignments as needed (`WATCHER_CREATE_OBJECTS=1`). Negate with `--no-create-objects`.', getBoolean('WATCHER_CREATE_OBJECTS', true))
.option('--no-create-objects', 'Do not create Assets or STIG Assignments (`WATCHER_CREATE_OBJECTS=0`).')
.option('--ignore-dir [name...]', 'DEPRECATED, use --ignore-glob. Sub-directory name to ignore. Can be invoked multiple times.(`WATCHER_IGNORE_DIRS=<csv>`)', pe.WATCHER_IGNORE_DIRS?.split(','))
Expand Down
41 changes: 24 additions & 17 deletions lib/scan.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { serializeError } from 'serialize-error'
import fg from 'fast-glob'
import Queue from 'better-queue';
import lineByLine from 'n-readlines';
import fs from 'node:fs';
import fs from 'fs';

const component = 'scan'
let history = new Set(); // in memory history set
Expand All @@ -14,6 +14,7 @@ const historyBatchDelayTimeout = 30000; // 30 sec
let writeQueue; // queue



/**
* Starts the scanner file scanner and manages the history of scanned files.
*
Expand Down Expand Up @@ -41,7 +42,7 @@ export default async function startScanner(options) {
currentFilesSet.add(entry);
logger.verbose({ component: component, message: `discovered file`, file: entry });
// check if the file is in the history
if (getHistory().has(entry)) {
if (history.has(entry)) {
logger.verbose({component: component, message: `history match`, file: entry})
}
// if the file is not in the history, add it to the parse queue and the history set
Expand Down Expand Up @@ -74,12 +75,13 @@ export default async function startScanner(options) {
* @param {Set} currentFilesSet - The set of current files.
*/
function processFileIntersect(currentFilesSet){
for (const file of getHistory()) {
for (const file of history) {
if (!currentFilesSet.has(file)) {
// Remove stale files from history
removeFromHistory(file);
}
}

}

// Schedule the next scan
Expand All @@ -106,6 +108,7 @@ function scheduleNextScan(options) {
* @param {string} options.historyFile - The path to the history file.
*/
function initHistory(options) {

// no history file, no need to initialize
if (!options.historyFile) return;

Expand Down Expand Up @@ -147,6 +150,11 @@ function initWriteQueue(options) {
// flag to indicate if we have pending entries to remove from the history file
let shouldRemoveEntries = false;

// if batch is size one cast a batch to an array
batch = Array.isArray(batch) ? batch : [batch]

// build adds and removals here

// process each task in the batch
for (const task of batch) {
if (task.operation === 'add') {
Expand All @@ -165,9 +173,9 @@ function initWriteQueue(options) {
}
done();
},{
batchSize: 4,
batchDelay: 20000,
batchDelayTimeout: historyBatchDelayTimeout,
batchSize: options.historyCargoSize,
batchDelay: options.historyCargoDelay,
// batchDelayTimeout: historyBatchDelayTimeout,
});

}
Expand Down Expand Up @@ -229,14 +237,14 @@ function setHistory(historySet) {
history = historySet;
}



/**
* Removes files from the history set and if needed pushes tasks to the write queue.
* @param {string|string[]} files - The file(s) to be removed.
*/
function removeFromHistory(files) {

// push a single item to queue

// process array of files
if (Array.isArray(files)) {
for (const entry of files) {
Expand Down Expand Up @@ -302,6 +310,7 @@ function saveCurrentHistoryToFile() {
try {
const data = Array.from(history).join('\n') + '\n';
fs.writeFileSync(historyFilePath, data);

logger.info({
component:component,
message: `history file overwritten with new data from memory`,
Expand All @@ -319,23 +328,21 @@ function saveCurrentHistoryToFile() {
function flushWriteQueue() {
return new Promise((resolve, reject) => {
writeQueue.on('drain', resolve); // Resolve when the queue is drained
logger.info({
component: component,
message: `flushing write queue`
});
});
}


// Handle shutdown
process.on('SIGINT', async () => {

// if we don't have a history file, just exit
if(!historyFilePath) process.exit(0);

logger.info({
component: component,
message: `received SIGINT. Having history and exiting`
message: `received SIGINT`
});
// Flush the write queue
flushWriteQueue();
saveCurrentHistoryToFile();
process.exit(0);
exitSafely();
});

export { scheduleNextScan,
Expand Down
26 changes: 23 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@
},
"devDependencies": {
"chai": "^5.0.3",
"esmock": "^2.6.3",
"esbuild": "^0.20.0",
"mocha": "^10.2.0",
"nodemon": "^3.0.3",
"sinon": "^17.0.1",
"esbuild": "^0.20.0"
"sinon": "^17.0.1"
}
}
Loading

0 comments on commit 6e8cdb0

Please sign in to comment.