Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent asset fetches #125

Merged
merged 5 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ async function preflightServices () {
logger.info({ component, message: `preflight token request suceeded`})
const promises = [
api.getCollection(options.collectionId),
api.getCollectionAssets(options.collectionId),
api.getInstalledStigs(),
api.getScapBenchmarkMap()
]
Expand Down
67 changes: 61 additions & 6 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
url: `${options.api}${endpoint}`,
responseType: 'json',
timeout: {
request: CONSTANTS.REQUEST_TIMEOUT
response: options.responseTimeout
}
}

Expand All @@ -61,7 +61,7 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
}
catch (e) {
// accept a client error for POST /assets if it reports a duplicate name
if (e.response?.statusCode === 400 && e.response?.body?.message === 'Duplicate name') {
if (e.response?.statusCode === 422 && e.response?.body?.message === 'Duplicate name exists') {
logResponse(e.response)
return fullResponse ? e?.response : e?.response?.body
}
Expand All @@ -71,13 +71,40 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
if (e.response?.statusCode === 403) {
Alarm.noGrant(true)
}
else {
else if (e.code === 'ETIMEDOUT' || e.response?.statusCode === 503) {
Alarm.apiOffline(true)
}
throw (e)
}
}

async function apiGetRequestsParallel({endpoints, authorize = true}) {
const requestOptions = {
responseType: 'json',
timeout: {
response: options.responseTimeout
}
}
if (authorize) {
try {
await getToken()
}
catch (e) {
e.component = 'api'
logError(e)
throw(e)
}
requestOptions.headers = {
Authorization: `Bearer ${tokens.access_token}`
}
}
const requests = []
for (const endpoint of endpoints) {
requests.push(got.get(endpoint, requestOptions))
}
return Promise.all(requests)
}

export async function getScapBenchmarkMap() {
const body = await apiRequest({endpoint: '/stigs/scap-maps'})
cache.scapBenchmarkMap = new Map(body.map(apiScapMap => [apiScapMap.scapBenchmarkId, apiScapMap.benchmarkId]))
Expand All @@ -98,9 +125,37 @@ export async function getCollection(collectionId) {
return cache.collection
}

export async function getCollectionAssets(collectionId) {
cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`})
return cache.assets
export async function getCollectionAssets({collectionId, targets}) {

let namedEndpoints = []
let metadataEndpoints = []

if (targets) {
const names = targets.filter(t => !t.metadata.cklHostName).map(t => t.name)
const cklMetadatas = targets.filter(t => t.metadata.cklHostName).map(t => (({cklHostName, cklWebDbSite, cklWebDbInstance}) => ({cklHostName, cklWebDbSite, cklWebDbInstance}))(t.metadata))

namedEndpoints = names.map(name => `${options.api}/assets?collectionId=${collectionId}&name=${encodeURIComponent(name)}&projection=stigs`)
metadataEndpoints = cklMetadatas.map(md => {
let metadataParams = []
for (const metadataKey of Object.keys(md).filter(k=>md[k])) {
metadataParams.push(`metadata=${metadataKey}%3A${encodeURIComponent(md[metadataKey])}`)
}
return `${options.api}/assets?collectionId=${collectionId}&${metadataParams.join('&')}&projection=stigs`
})
const responses = await apiGetRequestsParallel({
endpoints: new Set([...namedEndpoints, ...metadataEndpoints])
})
const assets = []
for (const response of responses) {
logResponse (response)
if (response.body?.[0]) assets.push(response.body[0])
}
return assets
}
else {
cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`})
return cache.assets
}
}

export async function getInstalledStigs() {
Expand Down
1 change: 1 addition & 0 deletions lib/args.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ program
.option('--no-ignore-dot', 'Do not ignore dotfiles in the path (`WATCHER_IGNORE_DOT=0`).')
.option('--strict-revision-check', 'For CKL, ignore checklist of uninstalled STIG revision (`WATCHER_STRICT_REVISION_CHECK=1`). Negate with `--no-strict-revision-check`.', getBoolean('WATCHER_STRICT_REVISION_CHECK', false))
.option('--no-strict-revision-check', 'For CKL, allow checklist of uninstalled STIG revision (`WATCHER_STRICT_REVISION_CHECK=0`). This is the default behavior.')
.option('--response-timeout <ms>', 'Specify the timeout duration in milliseconds for an API response to begin. If a response takes longer than this time, an error will be thrown.', parseIntegerArg, parseIntegerEnv(pe.WATCHER_RESPONSE_TIMEOUT) ?? 20000) // 20 secs

// Parse ARGV and get the parsed options object
// Options properties are created as camelCase versions of the long option name
Expand Down
6 changes: 3 additions & 3 deletions lib/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function getOpenIDConfiguration () {
const requestOptions = {
responseType: 'json',
timeout: {
request: CONSTANTS.REQUEST_TIMEOUT
response: options.responseTimeout
}
}
let response
Expand Down Expand Up @@ -122,7 +122,7 @@ async function authenticateClientSecret () {
password: options.clientSecret,
responseType: 'json',
timeout: {
request: CONSTANTS.REQUEST_TIMEOUT
response: options.responseTimeout
}
}

Expand Down Expand Up @@ -180,7 +180,7 @@ async function authenticateSignedJwt () {
},
responseType: 'json',
timeout: {
request: CONSTANTS.REQUEST_TIMEOUT
response: options.responseTimeout
}
}
logger.debug({
Expand Down
27 changes: 16 additions & 11 deletions lib/cargo.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async function writer ( taskAsset ) {
const component = 'writer'
try {
logger.debug({
component: component,
component,
message: `${taskAsset.assetProps.name} started`
})

Expand All @@ -23,7 +23,7 @@ async function writer ( taskAsset ) {
// GET projection=stigs is an object, we just need the benchmarkIds
r.apiAsset.stigs = r.apiAsset.stigs.map ( stig => stig.benchmarkId )
logger.info({ component: component, message: `asset ${r.created ? 'created' : 'found'}`, asset: r.apiAsset })
// TODO: If created === false, then STIG assignments should be vetted again
// Iterate: If created === false, then STIG assignments should be vetted again
taskAsset.assetProps = r.apiAsset
}

Expand All @@ -50,7 +50,7 @@ async function writer ( taskAsset ) {
if (reviews.length > 0) {
const r = await api.postReviews(options.collectionId, taskAsset.assetProps.assetId, reviews)
logger.info({
component: component,
component,
message: `posted reviews`,
asset: { name: taskAsset.assetProps.name, id: taskAsset.assetProps.assetId },
rejected: r.rejected,
Expand All @@ -59,7 +59,7 @@ async function writer ( taskAsset ) {
}
else {
logger.warn({
component: component,
component,
message: `no reviews to post`,
asset: { name: taskAsset.assetProps.name, id: taskAsset.assetProps.assetId },
})
Expand Down Expand Up @@ -94,24 +94,30 @@ async function writer ( taskAsset ) {
}

async function resultsHandler( parsedResults, cb ) {
const component = 'batch'
try {
batchId++
const isModeScan = options.mode === 'scan'
logger.info({component: component, message: `batch started`, batchId: batchId, size: parsedResults.length})
const apiAssets = await api.getCollectionAssets(options.collectionId)
logger.info({component, message: `batch started`, batchId, size: parsedResults.length})
const apiAssets = await api.getCollectionAssets(
{
collectionId: options.collectionId,
targets: parsedResults.map(pr=>pr.target)
}
)
logger.info({component, message: `asset data received`, batchId, size: apiAssets.length})
const apiStigs = await api.getInstalledStigs()
const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options:options })
logger.info({component, message: `stig data received`, batchId, size: apiStigs.length})
const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options })
isModeScan && tasks.errors.length && addToHistory(tasks.errors.map(e => e.sourceRef))
for ( const taskAsset of tasks.taskAssets.values() ) {
const success = await writer( taskAsset )
isModeScan && success && addToHistory(taskAsset.sourceRefs)
}
logger.info({component: component, message: 'batch ended', batchId: batchId})
logger.info({component, message: 'batch ended', batchId})
cb()
}
catch (e) {
logger.error({component: component, message: e.message, error: serializeError(e)})
logger.error({component, message: 'batch ended', error: serializeError(e), batchId})
cb( e, undefined)
}
}
Expand All @@ -120,7 +126,6 @@ const cargoQueue = new Queue(resultsHandler, {
id: 'file',
batchSize: options.cargoSize,
batchDelay: options.oneShot ? 0 : options.cargoDelay,
// batchDelayTimeout: options.cargoDelay
})
cargoQueue
.on('batch_failed', (err) => {
Expand Down
3 changes: 1 addition & 2 deletions lib/consts.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ export const ERR_AUTHOFFLINE = 2
export const ERR_NOTOKEN = 3
export const ERR_NOGRANT = 4
export const ERR_UNKNOWN = 5
export const ERR_FAILINIT = 6
export const REQUEST_TIMEOUT = 5000
export const ERR_FAILINIT = 6