Skip to content

Commit

Permalink
Merge pull request #101 from dfpc-coe/mission-cots
Browse files Browse the repository at this point in the history
TS Strict Mode
  • Loading branch information
ingalls authored Feb 13, 2024
2 parents 9fd534f + 4d1c812 commit 6292f9e
Show file tree
Hide file tree
Showing 42 changed files with 2,442 additions and 948 deletions.
37 changes: 10 additions & 27 deletions api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import Schema from '@openaddresses/batch-schema';
import Err from '@openaddresses/batch-error';
import Modeler from '@openaddresses/batch-generic';
import minimist from 'minimist';
import ConnectionPool, { ConnectionWebSocket, sleep } from './lib/connection-pool.js';
import { ConnectionWebSocket, sleep } from './lib/connection-pool.js';
import EventsPool from './lib/events-pool.js';
import { WebSocket, WebSocketServer } from 'ws';
import Cacher from './lib/cacher.js';
import BlueprintLogin, { tokenParser, AuthUser } from '@tak-ps/blueprint-login';
import Config from './lib/config.js';
import TAKAPI, { APIAuthPassword } from './lib/tak-api.js';
Expand Down Expand Up @@ -58,33 +57,20 @@ if (import.meta.url === `file://${process.argv[1]}`) {
postgres: process.env.POSTGRES || args.postgres || 'postgres://postgres@localhost:5432/tak_ps_etl',
nometrics: args.nometrics || false,
nosinks: args.nosinks || false,
nocache: args.nocache || false,
local: args.local || false,
});
await server(config);
}

export default async function server(config: Config) {
config.cacher = new Cacher(args.nocache, config.silent);

try {
await config.cacher.flush();
} catch (err) {
console.log(`ok - failed to flush cache: ${err instanceof Error? err.message : String(err)}`);
}

try {
const ServerModel = new Modeler(config.pg, pgschema.Server);
config.server = await ServerModel.from(1);
} catch (err) {
console.log(`ok - no server config found: ${err instanceof Error ? err.message : String(err)}`);
}

if (config.server) {
config.conns = new ConnectionPool(config, config.server, config.wsClients, config.StackName, config.local);
await config.conns.init();
} else {
console.error('not ok - no connection pool due to lack of server config');
}
await config.conns.init();

config.events = new EventsPool(config.StackName);
if (!config.noevents) await config.events.init(config.pg);
Expand Down Expand Up @@ -139,23 +125,20 @@ export default async function server(config: Config) {
const ProfileModel = new Modeler(config.pg, pgschema.Profile);

login.on('login', async (user) => {
let profile;
try {
profile = await ProfileModel.from(user.username);
await ProfileModel.from(user.username);
} catch (err) {
if (err instanceof Err && err.status === 404) {
profile = await ProfileModel.generate({ username: user.username });
const api = await TAKAPI.init(new URL(config.MartiAPI), new APIAuthPassword(user.username, user.password));

await ProfileModel.generate({
username: user.username,
auth: await api.Credentials.generate()
});
} else {
return console.error(err);
}
}

try {
const api = await TAKAPI.init(new URL(config.MartiAPI), new APIAuthPassword(user.username, user.password));
await ProfileModel.commit(profile.username, { auth: await api.Credentials.generate() });
} catch (err) {
console.error(err);
}
});

await schema.blueprint(login);
Expand Down
8 changes: 6 additions & 2 deletions api/lib/api/groups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@ export default class {

async list(query: {
useCache?: string;

[key: string]: unknown;
}): Promise<TAKList<Group>> {
const url = new URL(`/Marti/api/groups/all`, this.api.url);
for (const q in query) url.searchParams.append(q, query[q]);
for (const q in query) url.searchParams.append(q, String(query[q]));
return await this.api.fetch(url, {
method: 'GET'
});
}

async update(body: Group[], query: {
clientUid?: string;

[key: string]: unknown;
}): Promise<void> {
const url = new URL(`/Marti/api/groups/active`, this.api.url);
for (const q in query) url.searchParams.append(q, query[q]);
for (const q in query) url.searchParams.append(q, String(query[q]));

await this.api.fetch(url, {
method: 'PUT',
Expand Down
89 changes: 54 additions & 35 deletions api/lib/asset.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,85 @@
import Err from '@openaddresses/batch-error';
import path from 'path';
import S3 from '../lib/aws/s3.js';
import { _Object } from '@aws-sdk/client-s3';

import Config from '../lib/config.js';

export default async function AssetList(config: Config, prefix: string) {
export type AssetListOutput = {
total: number;
tiles: {
url: string;
},
assets: Array<AssetOutput>
}

export type AssetOutput = {
name: string;
visualized?: string;
vectorized?: string,
updated: number;
etag: string;
size: number;
}

export default async function AssetList(config: Config, prefix: string): Promise<AssetListOutput> {
try {
const viz = new Map() ;
const geo = new Map() ;
let assets = [];
(await S3.list(prefix)).map((l) => {
if (path.parse(l.Key).ext === '.pmtiles') viz.set(path.parse(l.Key).name, l)
else if (path.parse(l.Key).ext === '.geojsonld') geo.set(path.parse(l.Key).name, l)
else assets.push(l)
});
const assets: Array<_Object> = [];
(await S3.list(prefix))
.map((l) => {
if (path.parse(String(l.Key)).ext === '.pmtiles') viz.set(path.parse(String(l.Key)).name, l)
else if (path.parse(String(l.Key)).ext === '.geojsonld') geo.set(path.parse(String(l.Key)).name, l)
else assets.push(l)
});

assets = assets.map((a) => {
const isViz = viz.get(path.parse(a.Key).name);
if (isViz) viz.delete(path.parse(a.Key).name);
const isGeo = geo.get(path.parse(a.Key).name);
if (isGeo) geo.delete(path.parse(a.Key).name);
const final: AssetOutput[] = assets.map((a: _Object) => {
const isViz = viz.get(path.parse(String(a.Key)).name);
if (isViz) viz.delete(path.parse(String(a.Key)).name);
const isGeo = geo.get(path.parse(String(a.Key)).name);
if (isGeo) geo.delete(path.parse(String(a.Key)).name);

return {
name: a.Key.replace(prefix, ''),
visualized: isViz ? path.parse(a.Key.replace(prefix, '')).name + '.pmtiles' : false,
vectorized: isGeo ? path.parse(a.Key.replace(prefix, '')).name + '.geojsonld' : false,
updated: new Date(a.LastModified).getTime(),
etag: JSON.parse(a.ETag),
size: a.Size
name: String(a.Key).replace(prefix, ''),
visualized: isViz ? path.parse(String(a.Key).replace(prefix, '')).name + '.pmtiles' : undefined,
vectorized: isGeo ? path.parse(String(a.Key).replace(prefix, '')).name + '.geojsonld' : undefined,
updated: a.LastModified ? new Date(a.LastModified).getTime() : new Date().getTime(),
etag: String(JSON.parse(String(a.ETag))),
size: a.Size ? a.Size : 0
};
}).concat(Array.from(geo.values()).map((a) => {
const isViz = viz.get(path.parse(a.Key).name);
if (isViz) viz.delete(path.parse(a.Key).name);
const isViz = viz.get(path.parse(String(a.Key)).name);
if (isViz) viz.delete(path.parse(String(a.Key)).name);

return {
name: a.Key.replace(prefix, ''),
visualized: isViz ? path.parse(a.Key.replace(prefix, '')).name + '.pmtiles' : false,
vectorized: a.Key.replace(prefix, ''),
updated: new Date(a.LastModified).getTime(),
etag: JSON.parse(a.ETag),
size: a.Size
name: String(a.Key).replace(prefix, ''),
visualized: isViz ? path.parse(String(a.Key).replace(prefix, '')).name + '.pmtiles' : undefined,
vectorized: String(a.Key).replace(prefix, ''),
updated: a.LastModified ? new Date(a.LastModified).getTime() : new Date().getTime(),
etag: String(JSON.parse(String(a.ETag))),
size: a.Size ? a.Size : 0
};

})).concat(Array.from(viz.values()).map((a) => {
return {
name: a.Key.replace(prefix, ''),
visualized: a.Key.replace(prefix, ''),
vectorized: false,
updated: new Date(a.LastModified).getTime(),
etag: JSON.parse(a.ETag),
size: a.Size
name: String(a.Key).replace(prefix, ''),
visualized: String(a.Key).replace(prefix, ''),
vectorized: undefined,
updated: a.LastModified ? new Date(a.LastModified).getTime() : new Date().getTime(),
etag: String(JSON.parse(String(a.ETag))),
size: a.Size ? a.Size : 0
};
}));

return {
total: assets.length,
total: final.length,
tiles: {
url: String(new URL(`${config.PMTILES_URL}/tiles/${prefix}`))
},
assets
assets: final
};
} catch (err) {
throw new Err(500, err, 'Asset List Error');
throw new Err(500, err instanceof Error ? err : new Error(String(err)), 'Asset List Error');
}
}
10 changes: 5 additions & 5 deletions api/lib/aws/alarm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ export default class Alarm {
AlarmNamePrefix: `${this.stack}-layer-`
}));

for (const alarm of res.MetricAlarms) {
for (const alarm of (res.MetricAlarms || [])) {
let value = 'healthy';
if (alarm.StateValue === 'ALARM') value = 'alarm';
if (alarm.StateValue === 'INSUFFICIENT_DATA') value = 'unknown';

const layer = parseInt(alarm.AlarmName.replace(`${this.stack}-layer-`, ''));
const layer = parseInt(String(alarm.AlarmName).replace(`${this.stack}-layer-`, ''));

map.set(layer, value);
}

return map;
} catch (err) {
throw new Err(500, new Error(err), 'Failed to describe alarms');
throw new Err(500, new Error(err instanceof Error ? err.message : String(err)), 'Failed to describe alarms');
}
}

Expand All @@ -45,14 +45,14 @@ export default class Alarm {
AlarmNames: [`${this.stack}-layer-${layer}`]
}));

if (!res.MetricAlarms.length) return 'unknown';
if (!res.MetricAlarms || !res.MetricAlarms.length) return 'unknown';

let value = 'healthy';
if (res.MetricAlarms[0].StateValue === 'ALARM') value = 'alarm';
if (res.MetricAlarms[0].StateValue === 'INSUFFICIENT_DATA') value = 'unknown';
return value;
} catch (err) {
throw new Err(500, new Error(err), 'Failed to describe alarm');
throw new Err(500, new Error(err instanceof Error ? err.message : String(err)), 'Failed to describe alarm');
}
}
}
29 changes: 19 additions & 10 deletions api/lib/aws/batch-logs.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
import CloudWatchLogs from '@aws-sdk/client-cloudwatch-logs';
import process from 'node:process';

export type LogGroupOutput = {
logs: Array<{
message: string;
timestamp: number;
}>
}

/**
* @class
*/
export default class LogGroup {
static async list(stream: string) {
static async list(stream: string): Promise<LogGroupOutput> {
const cwl = new CloudWatchLogs.CloudWatchLogsClient({ region: process.env.AWS_DEFAULT_REGION });

const logs = await cwl.send(new CloudWatchLogs.GetLogEventsCommand({
logStreamName: stream,
logGroupName: `/aws/batch/job`,
startFromHead: true,
}))

try {
return {
logs: (await cwl.send(new CloudWatchLogs.GetLogEventsCommand({
logStreamName: stream,
logGroupName: `/aws/batch/job`,
startFromHead: true,
}))).events.map((log) => {
logs: (logs.events || []).map((log) => {
return {
message: log.message,
timestamp: log.timestamp
message: String(log.message),
timestamp: log.timestamp || 0
}
})
}
} catch (err) {
if (err.message.contains('The specified log stream does not exist')) {
if (err instanceof Error && err.message.includes('The specified log stream does not exist')) {
return { logs: [] }
} else {
throw err;
throw new Error(err instanceof Error ? err.message : String(err));
}
}
}
Expand Down
32 changes: 21 additions & 11 deletions api/lib/aws/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface BatchJob {
asset: string;
status: string;
created: number;
updated: number;
updated?: number;
logstream?: string;
}

Expand Down Expand Up @@ -72,13 +72,17 @@ export default class Batch {
static async job(config: Config, jobid: string): Promise<BatchJob> {
const batch = new AWSBatch.BatchClient({ region: process.env.AWS_DEFAULT_REGION });

const jobs = await batch.send(new AWSBatch.DescribeJobsCommand({
const res = await batch.send(new AWSBatch.DescribeJobsCommand({
jobs: [jobid]
}))

if (!jobs.jobs.length) throw new Err(400, null, 'AWS Does not report this job');
if (!res.jobs || !res.jobs.length) throw new Err(400, null, 'AWS Does not report this job');

const job = jobs.jobs[0];
const job = res.jobs[0];

if (!job.jobName) throw new Err(400, null, 'AWS Does not report a jobName')
if (!job.jobId) throw new Err(400, null, 'AWS Does not report a jobId')
if (!job.status) throw new Err(400, null, 'AWS Does not report a Status')

const name = job.jobName.replace(/data-[0-9]+-/, '');
let asset: string[] = [...name];
Expand All @@ -88,37 +92,43 @@ export default class Batch {
id: job.jobId,
asset: asset.join(''),
status: job.status,
created: job.createdAt,
created: job.createdAt || +new Date(),
updated: job.stoppedAt,
logstream: job.container.logStreamName
logstream: job.container ? job.container.logStreamName : undefined
}
}

static async list(config: Config, prefix: string): Promise<BatchJob[]> {
const batch = new AWSBatch.BatchClient({ region: process.env.AWS_DEFAULT_REGION });

const jobs = (await batch.send(new AWSBatch.ListJobsCommand({
const res = await batch.send(new AWSBatch.ListJobsCommand({
jobQueue: `${config.StackName}-queue`,
filters: [{
name: 'JOB_NAME',
values: [`${prefix}-*`]
}]
}))).jobSummaryList.map((job) => {
}))

if (!res.jobSummaryList) throw new Err(400, null, 'AWS Does not report a jobSummaryList')

const final: BatchJob[] = res.jobSummaryList.map((job) => {
if (!job.jobName) throw new Err(400, null, 'AWS Does not report a jobName')
if (!job.jobId) throw new Err(400, null, 'AWS Does not report a jobId')
const name = job.jobName.replace(`${prefix}-`, '');
let asset: string[] = [...name];
asset[name.lastIndexOf('_')] = '.';

return {
id: job.jobId,
asset: asset.join(''),
status: job.status,
created: job.createdAt,
status: String(job.status),
created: job.createdAt || +new Date(),
updated: job.stoppedAt,
};
}).sort((a, b) => {
return b.created - a.created;
});

return jobs;
return final;
}
};
Loading

0 comments on commit 6292f9e

Please sign in to comment.