diff --git a/src/models/aws-jobstatus.ts b/src/models/aws-jobstatus.ts new file mode 100644 index 0000000..a3aa3f4 --- /dev/null +++ b/src/models/aws-jobstatus.ts @@ -0,0 +1,7 @@ +export enum JobStatus { + CANCELED = 'CANCELED', + COMPLETE = 'COMPLETE', + ERROR = 'ERROR', + PROGRESSING = 'PROGRESSING', + SUBMITTED = 'SUBMITTED' +} diff --git a/src/pipelines/aws/aws-pipeline.ts b/src/pipelines/aws/aws-pipeline.ts index 5b7c9c9..c978cb5 100644 --- a/src/pipelines/aws/aws-pipeline.ts +++ b/src/pipelines/aws/aws-pipeline.ts @@ -1,6 +1,8 @@ import { ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs'; import { CreateJobCommand, + CreateJobCommandOutput, + GetJobCommand, MediaConvertClient } from '@aws-sdk/client-mediaconvert'; import { @@ -12,6 +14,7 @@ import { } from '@aws-sdk/client-s3'; import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; import { Resolution } from '../../models/resolution'; +import { JobStatus } from '../../models/aws-jobstatus'; import { Pipeline } from '../pipeline'; import { AWSPipelineConfiguration } from './aws-pipeline-configuration'; import fs from 'fs'; @@ -24,6 +27,7 @@ import { } from '../../models/quality-analysis-model'; import logger from '../../logger'; import { runFfprobe } from '../../pairVmaf'; +import { delay } from '../../utils'; export function isS3URI(url: string): boolean { try { @@ -39,7 +43,8 @@ export default class AWSPipeline implements Pipeline { private s3: S3Client; private mediaConvert: MediaConvertClient; private ecs: ECSClient; - private static readonly MAX_WAIT_TIME = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours). + private static readonly MAX_WAIT_TIME_S = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours). + private static readonly MEDIACONVERT_CHECK_JOB_INTERVAL_MS = 10000; //Check status of mediaconvert job interval in seconds. constructor(configuration: AWSPipelineConfiguration) { this.configuration = configuration; @@ -84,10 +89,43 @@ export default class AWSPipeline implements Pipeline { } } + async getMediaConvertJobStatus(jobId: string): Promise { + try { + const command = new GetJobCommand({ Id: jobId }); + const response = await this.mediaConvert.send(command); + if (response.Job?.Status === JobStatus.ERROR) { + logger.error(`Job ${jobId} error ${response.Job?.ErrorMessage}`); + } else if (response.Job?.Status === JobStatus.CANCELED) { + logger.error(`Job ${jobId} ${response.Job?.Status}`); + } + return response.Job?.Status; + } catch (error) { + logger.error(`Error getting job ${jobId}: ${error}`); + return undefined; + } + } + + async loopGetJobStatusUntilFinished( + jobId: string + ): Promise { + let status: string | undefined = await this.getMediaConvertJobStatus(jobId); + while ( + status !== JobStatus.COMPLETE && + status !== JobStatus.ERROR && + status !== JobStatus.CANCELED && + status !== undefined + ) { + logger.debug(`Job ${jobId} status: ${status}. Waiting...`); + status = await this.getMediaConvertJobStatus(jobId); + await delay(AWSPipeline.MEDIACONVERT_CHECK_JOB_INTERVAL_MS); + } + return status; + } + async waitForObjectInS3(S3Bucket: string, S3Key: string): Promise { try { await waitUntilObjectExists( - { client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME }, + { client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME_S }, { Bucket: S3Bucket, Key: S3Key } ); return true; @@ -230,13 +268,14 @@ export default class AWSPipeline implements Pipeline { // Transcode logger.info('Transcoding ' + inputFilename + ' to ' + outputURI + '...'); + let createJobResponse: CreateJobCommandOutput; try { const accelerationSettings = this.configuration.accelerationMode ? { AccelerationSettings: { Mode: this.configuration.accelerationMode } } : {}; - await this.mediaConvert.send( + createJobResponse = await this.mediaConvert.send( new CreateJobCommand({ Role: this.configuration.mediaConvertRole, Settings: settings, @@ -249,12 +288,20 @@ export default class AWSPipeline implements Pipeline { ); throw error; } - - const s3Status = await this.waitForObjectInS3( - outputBucket, - `${outputFolder}/${outputObject}` + if (!createJobResponse.Job?.Id) { + logger.error(`No Job from create response ${inputFilename}`); + return ''; + } + const mediaConvertJobStatus = await this.loopGetJobStatusUntilFinished( + createJobResponse.Job.Id ); - if (!s3Status) return ''; + if ( + mediaConvertJobStatus === JobStatus.ERROR || + mediaConvertJobStatus === JobStatus.CANCELED || + mediaConvertJobStatus === undefined + ) { + return ''; + } await this.probeMetadata(outputBucket, outputFolder, outputObject); logger.info('Finished transcoding ' + inputFilename + '.'); diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..d42ba45 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,3 @@ +export async function delay(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +}