Skip to content

Commit

Permalink
First iteration of interval history management + tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matte22 committed Feb 14, 2024
1 parent 6e8cdb0 commit 6640e1b
Show file tree
Hide file tree
Showing 9 changed files with 851 additions and 716 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ watched/
.env
*.log
log.json
bundle.js
bundle.js
coverage/
3 changes: 1 addition & 2 deletions lib/args.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ program
.option('--add-existing', 'For `--mode events`, existing files in the path will generate an `add` event (`WATCHER_ADD_EXISTING=1`). Ignored if `--mode scan`, negate with `--no-add-existing`.', getBoolean('WATCHER_ADD_EXISTING', false))
.option('--no-add-existing', 'Ignore existing files in the watched path (`WATCHER_ADD_EXISTING=0`).')
.option('--cargo-delay <ms>', 'Milliseconds to delay processing the queue (`WATCHER_CARGO_DELAY`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_CARGO_DELAY) ?? 2000)
.option('--history-cargo-delay <ms>', 'Milliseconds to delay processing the history file write queue (`WATCHER_HISTORY_CARGO_DELAY`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_CARGO_DELAY) ?? 290000)
.option('--history-write-interval <ms>', 'Interval in milliseconds for when to periodically sync history file(`WATCHER_HISTORY_WRITE_INTERVAL`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_WRITE_INTERVAL) ?? 15000)
.option('--cargo-size <number>', 'Maximum queue size that triggers processing (`WATCHER_CARGO_SIZE`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_CARGO_SIZE) ?? 10)
.option('--history-cargo-size <number>', 'Maximum queue size that triggers writing to the history file(`WATCHER_HISTORY_CARGO_SIZE`)', parseIntegerArg, parseIntegerEnv(pe.WATCHER_HISTORY_CARGO_SIZE) ?? parseIntegerEnv(pe.WATCHER_CARGO_SIZE) ?? 10)
.option('--create-objects', 'Create Assets or STIG Assignments as needed (`WATCHER_CREATE_OBJECTS=1`). Negate with `--no-create-objects`.', getBoolean('WATCHER_CREATE_OBJECTS', true))
.option('--no-create-objects', 'Do not create Assets or STIG Assignments (`WATCHER_CREATE_OBJECTS=0`).')
.option('--ignore-dir [name...]', 'DEPRECATED, use --ignore-glob. Sub-directory name to ignore. Can be invoked multiple times.(`WATCHER_IGNORE_DIRS=<csv>`)', pe.WATCHER_IGNORE_DIRS?.split(','))
Expand Down
257 changes: 60 additions & 197 deletions lib/scan.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ import { logger } from './logger.js'
import { queue } from './parse.js'
import { serializeError } from 'serialize-error'
import fg from 'fast-glob'
import Queue from 'better-queue';
import lineByLine from 'n-readlines';
import fs from 'fs';

const component = 'scan'
let history = new Set(); // in memory history set
const pendingRemovals = new Set(); // in memory set of entries to be removed from history file
let historyFilePath = null; // path to history file
const historyBatchDelayTimeout = 30000; // 30 sec
let writeQueue; // queue


let isWriteScheduled = false; // flag to indicate if there is pending files to write to the history file

/**
* Starts the scanner file scanner and manages the history of scanned files.
Expand All @@ -26,37 +21,32 @@ let writeQueue; // queue
*
*/
export default async function startScanner(options) {
let currentFilesSet = new Set(); // in memory set of files discovered in the current scan
let discoveredFiles = new Set(); // in memory set of files discovered in the current scan
try {
// scan the path for files
const stream = fg.stream([`${options.path}/**/*.ckl`, `${options.path}/**/*.xml`, `${options.path}/**/*.cklb`], {
dot: !options.ignoreDot,
suppressErrors: true,
ignore: options.ignoreGlob ?? []
});

logger.info({ component: component, message: `scan started`, path: options.path });

// for each file discovered
for await (const entry of stream) {
currentFilesSet.add(entry);
discoveredFiles.add(entry);
logger.verbose({ component: component, message: `discovered file`, file: entry });
// check if the file is in the history
if (history.has(entry)) {
logger.verbose({component: component, message: `history match`, file: entry})
}
// if the file is not in the history, add it to the parse queue and the history set
// if the file is not in the history, add it to the in memory history set.
else {
// remove this add when resolving issue #62!!!
addToHistory(entry);
queue.push(entry)
logger.info({component: component, message: `queued for parsing`, file: entry})
}
}

//Identify stale files: those in the history but not in the current scan
processFileIntersect(currentFilesSet);

removeStaleFiles(discoveredFiles);
logger.info({ component: component, message: `scan ended`, path: options.path });
} catch (e) {
logger.error({ component: component, error: serializeError(e) });
Expand All @@ -70,18 +60,15 @@ export default async function startScanner(options) {
}
/**
* Processes the file intersection between the current files set and the history.
* Removes stale files from the history.
* Removes stale files from in memory history.
*
* @param {Set} currentFilesSet - The set of current files.
*/
function processFileIntersect(currentFilesSet){
for (const file of history) {
if (!currentFilesSet.has(file)) {
// Remove stale files from history
removeFromHistory(file);
}
function removeStaleFiles(currentFilesSet){
const staleFiles = Array.from(history).filter(file => !currentFilesSet.has(file));
if (staleFiles.length > 0) {
removeFromHistory(staleFiles);
}

}

// Schedule the next scan
Expand All @@ -102,8 +89,8 @@ function scheduleNextScan(options) {


/**
* Initializes the history by reading it from a file and adding each line to the history set.
* Also Initalizes a write queue if needed to manage the history file.
* Initializes the history in memory by reading it from a file and adding each line to the history set.
* Also Initalizes a write interval for the history file.
* @param {object} options - The watcher startup config options.
* @param {string} options.historyFile - The path to the history file.
*/
Expand All @@ -114,14 +101,10 @@ function initHistory(options) {

historyFilePath = options.historyFile;

// create a queue if we need it
initWriteQueue(options);

// ensure we have a history file and read it into memory
if (historyFilePath && fs.existsSync(historyFilePath)) {
const liner = new lineByLine(historyFilePath);
let line;

while (line = liner.next()) {
// add each line to the history set
history.add(line.toString('ascii'));
Expand All @@ -133,184 +116,77 @@ function initHistory(options) {

});
}
setInterval(() => {
if (!isWriteScheduled) return;
writeHistoryToFile()
isWriteScheduled = false;
}, options.historyWriteInterval);

}

/**
* Initializes the write queue for history file management.
*
* @param {object} options - The options for the write queue.
* @param {number} options.historyCargoSize - The size of each batch in the write queue.
* @param {number} options.historyCargoDelay - The delay between each batch in the write queue.
* Removes files from the history set and schedules a write to the history file.
* @param {string|string[]} files - The file(s) to be removed.
*/
function initWriteQueue(options) {

// queue for history file management
writeQueue = new Queue((batch, done) => {

// flag to indicate if we have pending entries to remove from the history file
let shouldRemoveEntries = false;

// if batch is size one cast a batch to an array
batch = Array.isArray(batch) ? batch : [batch]

// build adds and removals here

// process each task in the batch
for (const task of batch) {
if (task.operation === 'add') {
// Append 'add' operations to the history file
appendToHistoryFile(task.entry);
} else if (task.operation === 'remove') {
// Add 'remove' operations to the pendingRemovals set
pendingRemovals.add(task.entry);
shouldRemoveEntries = true;
function removeFromHistory (files) {
// process array of files
if (Array.isArray(files)) {
for (const entry of files) {
history.delete(entry)
}
}

// if we have pending removals, remove them`
if (shouldRemoveEntries) {
removeFromHistoryFile();
// process single file
else {
history.delete(files)
}
done();
},{
batchSize: options.historyCargoSize,
batchDelay: options.historyCargoDelay,
// batchDelayTimeout: historyBatchDelayTimeout,
});

}


/**
* Appends a single entry to the history file.
* @param {string} entry - The entry to be appended.
*/
function appendToHistoryFile(entry) {
// apending a single entry to the history file
fs.appendFile(historyFilePath, entry + '\n', (err) => {
if (err) {
logger.error({
component: component,
error: serializeError(err),
message: 'Failed to append to history file'
});
} else {
logger.info({
component: component,
message: `wrote entry to history file`,
file: entry
});
}
});
}


/**
* Removes entries from the history file.
* Will rewrite the history file with the pending entries removed.
*/
function removeFromHistoryFile() {
// read the history file into memory
const fileContent = fs.readFileSync(historyFilePath, 'utf-8');
const lines = fileContent.split('\n');
// filter out the entries to be removed
const newContent = lines.filter(line => !pendingRemovals.has(line)).join('\n');
// rewrite
fs.writeFileSync(historyFilePath, newContent);
isWriteScheduled = true; // Indicate that there's work to be done
logger.info({
component: component,
message: `removed entries from history file`,
file: Array.from(pendingRemovals)
});
// clear the pending removals set
pendingRemovals.clear();
}


function getHistory() {
return new Set(history); // Return a copy to prevent direct manipulation
component: component,
message: `removed from history`,
file: files
})
}


// set the history set
function setHistory(historySet) {
history = historySet;
}

/**
* Removes files from the history set and if needed pushes tasks to the write queue.
* @param {string|string[]} files - The file(s) to be removed.
* Adds files to the history set and schedules a write to the history file.
* @param {Array|string} files - The file(s) to be added.
*/
function removeFromHistory(files) {

// push a single item to queue

function addToHistory (files) {
// process array of files
if (Array.isArray(files)) {
for (const entry of files) {
history.delete(entry);
// only push to the write queue if we have a history file
if(historyFilePath) writeQueue.push({ operation: 'remove', entry: entry });
logger.info({
component: component,
message: `removed from history`,
file: entry
});
}
}
// process single file
else {
history.delete(files);
// only push to the write queue if we have a history file
if(historyFilePath) writeQueue.push({ operation: 'remove', entry: files });
logger.info({
component: component,
message: `removed from history`,
file: files
});
for (const entry of files) {
history.add(entry)
}
} else {
// single item
history.add(files)
}

isWriteScheduled = true
logger.info({
component: component,
message: `added to history`,
file: files
})
}

/**
* Adds files to the history set and if needed pushes tasks to the write queue.
* @param {Array|string} files - TThe file(s) to be added.
* Sets the history with a new set of values. Primarily used for testing.
* @param {Set} newSet - The new set of values to be assigned to the history.
*/
function addToHistory(files) {

// process array of files
if (Array.isArray(files)) {
for (const entry of files) {
history.add(entry);
if(historyFilePath) writeQueue.push({ operation: 'add', entry: entry });
logger.info({
component: component,
message: `added to history`,
file: entry
});
};
}
else {
// single item
history.add(files);
if(historyFilePath) writeQueue.push({ operation: 'add', entry: files });
logger.info({
component: component,
message: `added to history`,
file: files
});
}
function setHistory(newSet){
history = newSet;
}


/**
* Saves the current history to a file. (used for process exit)
* Saves the current history set in memory to a history file.
*/
function saveCurrentHistoryToFile() {
function writeHistoryToFile() {
try {
const data = Array.from(history).join('\n') + '\n';
fs.writeFileSync(historyFilePath, data);

logger.info({
component:component,
message: `history file overwritten with new data from memory`,
Expand All @@ -324,33 +200,20 @@ function saveCurrentHistoryToFile() {
}
}

// Flush the write queue
function flushWriteQueue() {
return new Promise((resolve, reject) => {
writeQueue.on('drain', resolve); // Resolve when the queue is drained
logger.info({
component: component,
message: `flushing write queue`
});
});
}


// Handle shutdown
process.on('SIGINT', async () => {
logger.info({
component: component,
message: `received SIGINT`
});
exitSafely();
// write history to file before exiting if we are using a history file
if(historyFilePath)writeHistoryToFile();
process.exit(0)
});

export { scheduleNextScan,
initHistory,
getHistory,
export {
initHistory,
addToHistory,
removeFromHistory,
setHistory,
saveCurrentHistoryToFile,
flushWriteQueue
removeFromHistory
}
Loading

0 comments on commit 6640e1b

Please sign in to comment.