Skip to content

Commit

Permalink
refactor: alarm handling (#130)
Browse files Browse the repository at this point in the history
* update all dependencies
  • Loading branch information
csmig authored Jun 17, 2024
1 parent 431d2bd commit f52b98b
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 292 deletions.
67 changes: 54 additions & 13 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { logger, getSymbol } from './logger.js'
import Alarm from './alarm.js'
import * as CONSTANTS from './consts.js'

const component = 'api'
const cache = {
collection: null,
assets: null,
Expand Down Expand Up @@ -33,6 +34,17 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
method,
url: `${options.api}${endpoint}`,
responseType: 'json',
retry: {
errorCodes: [
'ECONNRESET',
'EADDRINUSE',
'ECONNREFUSED',
'EPIPE',
'ENOTFOUND',
'ENETUNREACH',
'EAI_AGAIN'
]
},
timeout: {
response: options.responseTimeout
}
Expand Down Expand Up @@ -71,7 +83,7 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
if (e.response?.statusCode === 403) {
Alarm.noGrant(true)
}
else if (e.code === 'ETIMEDOUT' || e.response?.statusCode === 503) {
else if (e.code !== 'ERR_NON_2XX_3XX_RESPONSE') {
Alarm.apiOffline(true)
}
throw (e)
Expand All @@ -81,6 +93,17 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
async function apiGetRequestsParallel({endpoints, authorize = true}) {
const requestOptions = {
responseType: 'json',
retry: {
errorCodes: [
'ECONNRESET',
'EADDRINUSE',
'ECONNREFUSED',
'EPIPE',
'ENOTFOUND',
'ENETUNREACH',
'EAI_AGAIN'
]
},
timeout: {
response: options.responseTimeout
}
Expand Down Expand Up @@ -142,9 +165,23 @@ export async function getCollectionAssets({collectionId, targets}) {
}
return `${options.api}/assets?collectionId=${collectionId}&${metadataParams.join('&')}&projection=stigs`
})
const responses = await apiGetRequestsParallel({
endpoints: new Set([...namedEndpoints, ...metadataEndpoints])
})
let responses
try {
responses = await apiGetRequestsParallel({
endpoints: new Set([...namedEndpoints, ...metadataEndpoints])
})
}
catch (e) {
logError(e)
if (e.response?.statusCode === 403) {
Alarm.noGrant(true)
}
else if (e.code !== 'ERR_NON_2XX_3XX_RESPONSE') {
Alarm.apiOffline(true)
}
throw (e)
}

const assets = []
for (const response of responses) {
logResponse (response)
Expand Down Expand Up @@ -205,7 +242,7 @@ export function canUserAccept() {

function logResponse (response) {
logger.http({
component: 'api',
component,
message: 'query',
request: {
method: response.request.options?.method,
Expand All @@ -216,7 +253,7 @@ function logResponse (response) {
}
})
logger.debug({
component: 'api',
component,
message: 'query bodies',
request: {
method: response.request.options?.method,
Expand All @@ -231,18 +268,22 @@ function logResponse (response) {
}

function logError (e) {
logger.error({
component: 'api',
const logObject = {
component,
code: e.code,
message: e.message,
request: {
method: e.request?.options?.method,
url: e.request?.requestUrl
} ,
response: {
}
}
if (e.code === 'ERR_NON_2XX_3XX_RESPONSE') {
logObject.response = {
status: e.response?.statusCode,
body: e.response?.body
}
})
}
logger.error(logObject)
}

/**
Expand All @@ -269,7 +310,7 @@ let alarmRetryCount = 0
*/
function offlineRetryHandler() {
logger.info({
component: 'api',
component,
message: 'Testing if API is online'
})
alarmRetryCount++
Expand All @@ -281,7 +322,7 @@ function offlineRetryHandler() {
.catch(() => {
if (alarmRetryCount >= alarmRetryLimit) {
logger.info({
component: 'api',
component,
message: 'API connectivity maximum tries reached, requesting shutdown'
})
Alarm.shutdown(CONSTANTS.ERR_APIOFFLINE)
Expand Down
8 changes: 4 additions & 4 deletions lib/args.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ if (options.logFile) {

logger.logger.log({
level: 'debug',
component: component,
component,
message: 'parsed options',
options: options
})
Expand All @@ -150,7 +150,7 @@ for (const key in CONSTANTS.configBounds) {
if (options[key] < bounds.min || options[key] > bounds.max) {
logger.logger.log({
level: 'error',
component: component,
component,
message: `config value out of bounds: ${key} = ${options[key]}, must be between ${bounds.min} and ${bounds.max}`
})
configValid = false
Expand Down Expand Up @@ -189,7 +189,7 @@ if (configValid) {
// Could not make a private key
logger.logger.log({
level: 'error',
component: component,
component,
message: 'private key error',
file: options.clientKey,
error: e
Expand All @@ -206,7 +206,7 @@ if (configValid) {
if (!options.clientSecret) {
// Don't know the client secret
logger.logger.error({
component: component,
component,
message: 'Missing client secret'
})
configValid = false
Expand Down
55 changes: 47 additions & 8 deletions lib/cargo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import * as api from './api.js'
import { serializeError } from 'serialize-error'
import { TaskObject } from '@nuwcdivnpt/stig-manager-client-modules'
import { addToHistory } from './scan.js'
import Alarm from './alarm.js'


const component = 'cargo'
let batchId = 0

async function writer ( taskAsset ) {
const component = 'writer'
try {
logger.debug({
component,
Expand All @@ -22,7 +23,7 @@ async function writer ( taskAsset ) {
const r = await api.createOrGetAsset( taskAsset.assetProps )
// GET projection=stigs is an object, we just need the benchmarkIds
r.apiAsset.stigs = r.apiAsset.stigs.map ( stig => stig.benchmarkId )
logger.info({ component: component, message: `asset ${r.created ? 'created' : 'found'}`, asset: r.apiAsset })
logger.info({ component, message: `asset ${r.created ? 'created' : 'found'}`, asset: r.apiAsset })
// Iterate: If created === false, then STIG assignments should be vetted again
taskAsset.assetProps = r.apiAsset
}
Expand All @@ -35,7 +36,7 @@ async function writer ( taskAsset ) {
stigs: taskAsset.assetProps.stigs
})
r.stigs = r.stigs.map( stig => stig.benchmarkId )
logger.info({ component: component, message: `STIG assignments updated`,
logger.info({ component, message: `STIG assignments updated`,
asset: { assetId: r.assetId, name: r.name, stigs: r.stigs } })
}

Expand Down Expand Up @@ -69,7 +70,7 @@ async function writer ( taskAsset ) {
}
catch (error) {
const errorObj = {
component: error.component ?? component,
component,
message: error?.message,
}
if (error.request) {
Expand Down Expand Up @@ -110,8 +111,10 @@ async function resultsHandler( parsedResults, cb ) {
const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options })
isModeScan && tasks.errors.length && addToHistory(tasks.errors.map(e => e.sourceRef))
for ( const taskAsset of tasks.taskAssets.values() ) {
const success = await writer( taskAsset )
isModeScan && success && addToHistory(taskAsset.sourceRefs)
if (!Alarm.isAlarmed()) {
const success = await writer( taskAsset )
isModeScan && success && addToHistory(taskAsset.sourceRefs)
}
}
logger.info({component, message: 'batch ended', batchId})
cb()
Expand All @@ -130,7 +133,7 @@ const cargoQueue = new Queue(resultsHandler, {
cargoQueue
.on('batch_failed', (err) => {
logger.error( {
component: 'cargo',
component,
message: err?.message,
})
})
Expand All @@ -139,9 +142,45 @@ cargoQueue
})
.on('drain', () => {
if (options.oneShot) {
logger.info({component: 'cargo', message: 'finished one shot mode'})
logger.info({component, message: 'finished one shot mode'})
process.exit()
}
})

Alarm.on('alarmRaised', onAlarmRaised)
Alarm.on('alarmLowered', onAlarmLowered)

/**
* @typedef {import('./alarm.js').AlarmType} AlarmType
*/

/**
* Handles raised alarms
* @param {AlarmType} alarmType - The type of alarm.
* Intended to be a callback function of Alarm.on('alarmRaised')
*/
function onAlarmRaised (alarmType) {
logger.info({
component,
message: `pausing queue on alarm raised`,
alarmType
})
cargoQueue.pause()
}

/**
* Handles lowered alarms
* @param {AlarmType} alarmType - The type of alarm.
* Intended to be a callback function of Alarm.on('alarmRaised')
*/
function onAlarmLowered (alarmType) {
logger.info({
component,
message: `resuming queue on alarm lowered`,
alarmType
})
cargoQueue.resume()
}


export { cargoQueue }
25 changes: 2 additions & 23 deletions lib/events.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {options} from './args.js'
import { logger } from './logger.js'
import { queue } from './parse.js'
import { parseQueue } from './parse.js'
import { serializeError } from 'serialize-error'
import { watch } from 'chokidar'
import Alarm from './alarm.js'

const component = 'events'

Expand All @@ -28,8 +27,6 @@ function startFsEventWatcher () {
})
watcher.on('error', onError )
watcher.on('add', onAdd )
Alarm.on('alarmRaised', onAlarmRaised)
Alarm.on('alarmLowered', onAlarmLowered)
}

function onAdd (file) {
Expand All @@ -42,7 +39,7 @@ function onAdd (file) {
event: 'add',
file
})
queue.push( file )
parseQueue.push( file )
}
}

Expand All @@ -53,22 +50,4 @@ function onError (e) {
})
}

function onAlarmRaised (alarmType) {
logger.info({
component,
message: `pausing parse queue on alarm raised`,
alarmType
})
queue.pause()
}

function onAlarmLowered (alarmType) {
logger.info({
component,
message: `resuming parse queue on alarm lowered`,
alarmType
})
queue.resume()
}

export default startFsEventWatcher
Loading

0 comments on commit f52b98b

Please sign in to comment.