Skip to content

Commit

Permalink
Archive script: Optimizations, bug fixes, stats logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Lezek123 committed Nov 1, 2024
1 parent 2b45e12 commit 7b7bf26
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 14 deletions.
7 changes: 7 additions & 0 deletions storage-node/src/commands/archive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ Supported values: warn, error, debug, info. Default:debug`,
default: 'DEEP_ARCHIVE',
options: Object.keys(StorageClass) as StorageClass[],
}),
statsLoggingInterval: flags.integer({
description: 'How often the upload/download/compression statistics summary will be logged (in minutes).',
env: 'STATS_LOGGING_INTERVAL',
default: 60,
required: true,
}),
...ApiCommandBase.flags,
}

Expand Down Expand Up @@ -375,6 +381,7 @@ Supported values: warn, error, debug, info. Default:debug`,
syncWorkersNum: flags.syncWorkersNumber,
syncWorkersTimeout: flags.syncWorkersTimeout,
syncInterval: flags.syncInterval,
statsLoggingInterval: flags.statsLoggingInterval,
})

await archiveService.init()
Expand Down
24 changes: 21 additions & 3 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import sleep from 'sleep-promise'
import { Logger } from 'winston'
import { StorageClass } from '@aws-sdk/client-s3'
import { CompressionService } from './compression'
import { StatsCollectingService } from './stats'

type DataObjectData = {
id: string
Expand Down Expand Up @@ -135,6 +136,8 @@ type ArchiveServiceParams = {
syncInterval: number
// Archive tracking backup
archiveTrackfileBackupFreqMinutes: number
// Stats logging
statsLoggingInterval: number
}

export class ArchiveService {
Expand Down Expand Up @@ -176,6 +179,9 @@ export class ArchiveService {
private syncQueueObjectsSize = 0
private syncWorkersTimeout: number
private syncInterval: number
// Statistics
private statsLoggingInterval: number
private statsCollectingService: StatsCollectingService

constructor(params: ArchiveServiceParams) {
// From params:
Expand All @@ -196,12 +202,14 @@ export class ArchiveService {
this.syncWorkersTimeout = params.syncWorkersTimeout
this.syncInterval = params.syncInterval
this.archiveTrackfileBackupFreqMinutes = params.archiveTrackfileBackupFreqMinutes
this.statsLoggingInterval = params.statsLoggingInterval
// Other:
this.objectTrackingService = new ObjectTrackingService(this.uploadQueueDir)
this.archivesTrackingService = new ArchivesTrackingService(this.uploadQueueDir)
this.dataObjectsQueue = new DataObjectsQueue(this.uploadQueueDir)
this.uploadWorkingStack = new WorkingStack()
this.syncWorkingStack = new WorkingStack()
this.statsCollectingService = new StatsCollectingService()
this.logger = logger.child({ label: 'ArchiveService' })
}

Expand All @@ -228,8 +236,13 @@ export class ArchiveService {
* Initializes downloadEvent handlers and archive trackfile backup interval.
*/
private installTriggers(): void {
downloadEvents.on('success', (dataObjectId, size) => {
downloadEvents.on('success', (dataObjectId, size, startTime, endTime) => {
this.logger.debug(`Download success event received for object: ${dataObjectId}`)
this.statsCollectingService.addDownloadJobStats({
start: startTime,
end: endTime,
size: size,
})
this.handleSuccessfulDownload(dataObjectId).catch((e) => {
this.logger.error(`Critical error on handleSuccessfulDownload: ${e.toString()}`)
process.exit(1)
Expand All @@ -244,6 +257,9 @@ export class ArchiveService {
this.logger.error(`Failed to upload archive trackfile backup to S3: ${e.toString()}`)
})
}, this.archiveTrackfileBackupFreqMinutes * 60_000)
setInterval(() => {
this.statsCollectingService.logSummaries()
}, this.statsLoggingInterval * 60_000)
}

/**
Expand Down Expand Up @@ -440,7 +456,8 @@ export class ArchiveService {
this.uploadQueueDir,
this.archivesTrackingService,
this.s3ConnectionHandler,
this.compressionService
this.compressionService,
this.statsCollectingService
),
])
// 2.2. If it's already tracked by archiveTrackingService (already uploaded): remove
Expand Down Expand Up @@ -611,7 +628,8 @@ export class ArchiveService {
batch.map((o) => o.id),
this.archivesTrackingService,
this.s3ConnectionHandler,
this.compressionService
this.compressionService,
this.statsCollectingService
)
uploadTasks.push(uploadTask)
}
Expand Down
6 changes: 4 additions & 2 deletions storage-node/src/services/archive/compression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ export class TarService extends CompressionService {
): Promise<void> {
try {
const useCompressProgram = this.getCompressProgramFlag(level || this.defaultCompressionLevel)
const baseDir = path.dirname(compressFilePaths[0])
const relativeFilePaths = compressFilePaths.map((f) => path.relative(baseDir, f))
const { stderr } = await execPromise(
// -c - compress
// -f - output to file
// -P - don't strip leading '/'s from file names
`tar -Pcf ${archiveFilePath} ${useCompressProgram} ${compressFilePaths.join(' ')}`
// -C - omit the path from file names (cd into the directory)
`tar -cf ${archiveFilePath} ${useCompressProgram} -C ${baseDir} ${relativeFilePaths.join(' ')}`
)
if (stderr) {
logger.warn(`tar process stderr: ${stderr}`)
Expand Down
122 changes: 122 additions & 0 deletions storage-node/src/services/archive/stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { Logger } from 'winston'
import logger from '../../services/logger'
import _ from 'lodash'

type SizeDurationJobStats = {
size: number
start: bigint
end: bigint
}

type DownloadJobStats = SizeDurationJobStats

type UploadJobStats = SizeDurationJobStats

type CompressionJobStats = SizeDurationJobStats & {
sizeAfter: number
}

export class StatsCollectingService {
private logger: Logger
private downloadJobsStats: DownloadJobStats[] = []
private uploadJobsStats: UploadJobStats[] = []
private compressionJobsStats: CompressionJobStats[] = []

constructor() {
this.logger = logger.child({ label: 'StatsCollectingService' })
}

public addDownloadJobStats(stats: DownloadJobStats): void {
this.downloadJobsStats.push(stats)
}

public addUploadJobStats(stats: UploadJobStats): void {
this.uploadJobsStats.push(stats)
}

public addCompressionJobStats(stats: CompressionJobStats): void {
this.compressionJobsStats.push(stats)
}

// Convert time in miliseconds to an `HH:MM:SS.XX` string
private humanizeDuration(durationMs: number): string {
const hours = Math.floor(durationMs / 1000 / 60 / 60)
const minutes = Math.floor((durationMs / 1000 / 60) % 60)
const seconds = Math.floor((durationMs / 1000) % 60)
const miniseconds = Math.floor((durationMs % 1000) / 10)
return `${hours.toString().padStart(2, '0')}:${minutes.toString().padStart(2, '0')}:${seconds
.toString()
.padStart(2, '0')}.${miniseconds.toString().padStart(2, '0')}`
}

private toMs(ns: bigint) {
return Number(ns / BigInt(1_000_000))
}

private countTotalDurationMs(source: SizeDurationJobStats[]): number {
if (source.length === 0) {
// Prevent division by 0
return 1
}

// Because jobs are executed in parallel, we "merge" start/end times
// when they overlap.
const jobs = _.sortBy(source, (job) => job.start)
let mergedRange: [bigint, bigint] = [jobs[0].start, jobs[0].end]
const mergedRanges: [bigint, bigint][] = []
for (const job of jobs) {
const start = job.start
const end = job.end
if (start <= mergedRange[1]) {
mergedRange[1] = end > mergedRange[1] ? end : mergedRange[1]
} else {
mergedRanges.push(mergedRange)
mergedRange = [start, end]
}
}
mergedRanges.push(mergedRange)

return this.toMs(mergedRanges.reduce((a, b) => a + (b[1] - b[0]), BigInt(0)))
}

private sizeDurationStats(source: SizeDurationJobStats[]): string {
const totalSize = source.reduce((a, b) => a + b.size, 0)
const totalDuration = this.countTotalDurationMs(source)
const numFiles = source.length

const totalSizeGB = (totalSize / 1_000_000_000).toFixed(2)
const totalDurationH = this.humanizeDuration(totalDuration)
const MBps = (totalSize / 1_000_000 / (totalDuration / 1000)).toFixed(2)

return `num_files=${numFiles}, total_size=${totalSizeGB}GB, total_duration=${totalDurationH}, avg_speed=${MBps}MB/s`
}

public logDownloadSummary(): void {
this.logger.info(`Download summary: ${this.sizeDurationStats(this.downloadJobsStats)}`)
}

public logUploadSummary(): void {
this.logger.info(`Upload summary: ${this.sizeDurationStats(this.uploadJobsStats)}`)
}

public logCompressionSummary(): void {
const totalSizeBefore = this.compressionJobsStats.reduce((a, b) => a + b.size, 0)
const totalSizeAfter = this.compressionJobsStats.reduce((a, b) => a + b.sizeAfter, 0)
const totalSizeReduction = totalSizeBefore - totalSizeAfter

const totalSizeAfterGB = (totalSizeAfter / 1_000_000_000).toFixed(2)
const reducitonPercentage = ((totalSizeReduction / totalSizeBefore) * 100).toFixed(2)

this.logger.info(
`Compression summary: ${this.sizeDurationStats(
this.compressionJobsStats
)}, total_archives_size=${totalSizeAfterGB}GB, avg_size_reduction=${reducitonPercentage}%`
)
}

public logSummaries(): void {
this.logDownloadSummary()
this.logUploadSummary()
this.logCompressionSummary()
}
}
66 changes: 59 additions & 7 deletions storage-node/src/services/archive/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { IConnectionHandler } from '../s3/IConnectionHandler'
import { ArchivesTrackingService } from './tracking'
import { StorageClass } from '@aws-sdk/client-s3'
import { CompressionService } from './compression'
import { StatsCollectingService } from './stats'

/**
* Compresses provided files into an archive and removes them.
Expand All @@ -22,7 +23,8 @@ export class CompressFilesTask implements Task {
constructor(
private uploadsDirectory: string,
private dataObjectIds: string[],
private compressionService: CompressionService
private compressionService: CompressionService,
private statsCollectingService: StatsCollectingService
) {
this.archiveFileName = blake2AsHex(_.sortBy(this.dataObjectIds, (id) => parseInt(id)).join(',')).substring(2)
this.ext = this.compressionService.getExt()
Expand All @@ -39,6 +41,31 @@ export class CompressFilesTask implements Task {
return this.archiveFilePath
}

private async getPreCompressionSize(): Promise<number> {
const stats = await Promise.all(this.dataObjectPaths.map((p) => fsp.stat(p)))
return stats.reduce((a, b) => a + b.size, 0)
}

private async getPostCompressionSize(): Promise<number> {
const { size } = await fsp.stat(this.archiveFilePath)
return size
}

private async logCompressionStats(startTime: bigint): Promise<void> {
try {
const preCompressionSize = await this.getPreCompressionSize()
const postCompressionSize = await this.getPostCompressionSize()
this.statsCollectingService.addCompressionJobStats({
size: preCompressionSize,
sizeAfter: postCompressionSize,
start: startTime,
end: process.hrtime.bigint(),
})
} catch (e) {
logger.error(`Failed to get compression stats for archive ${this.archiveFilePath}: ${e.toString()}`)
}
}

private async verifyAndMoveArchive(): Promise<void> {
try {
await fsp.access(this.tmpArchiveFilePath, fsp.constants.W_OK | fsp.constants.R_OK)
Expand Down Expand Up @@ -69,8 +96,10 @@ export class CompressFilesTask implements Task {

public async execute(): Promise<void> {
try {
const startTime = process.hrtime.bigint()
await this.compressionService.compressFiles(this.dataObjectPaths, this.tmpArchiveFilePath)
await this.verifyAndMoveArchive()
await this.logCompressionStats(startTime)
await this.clenaup()
} catch (e) {
throw new Error(`Compression task failed: ${e.toString()}`)
Expand All @@ -90,19 +119,20 @@ export class UploadArchiveFileTask implements Task {
private archivesTrackingService: ArchivesTrackingService,
private connectionHandler: IConnectionHandler<StorageClass>,
private compressionService: CompressionService,
private statsCollectingService: StatsCollectingService,
private dataObjectIds?: string[]
) {}

public description(): string {
return `Uploading ${this.archiveFilePath} to S3 (key: ${this.objectKey})...`
}

public async getPackedFiles(): Promise<string[]> {
private async getPackedFiles(): Promise<string[]> {
const packedFiles = await this.compressionService.listFiles(this.archiveFilePath)
return packedFiles
}

public async cleanup(dataObjectIds: string[]): Promise<void> {
private async cleanup(dataObjectIds: string[]): Promise<void> {
const paths = [this.archiveFilePath, ...dataObjectIds.map((id) => path.join(this.uploadsDirectory, id))]
try {
await Promise.all(paths.map((p) => fsp.rm(p, { force: true })))
Expand All @@ -111,11 +141,26 @@ export class UploadArchiveFileTask implements Task {
}
}

private async logUploadStats(startTime: bigint): Promise<void> {
try {
const { size } = await fsp.stat(this.archiveFilePath)
this.statsCollectingService.addUploadJobStats({
size,
start: startTime,
end: process.hrtime.bigint(),
})
} catch (e) {
logger.error(`Failed to get compression stats for archive ${this.archiveFilePath}: ${e.toString()}`)
}
}

public async execute(): Promise<void> {
const dataObjectIds = this.dataObjectIds || (await this.getPackedFiles())
try {
const startTime = process.hrtime.bigint()
await this.connectionHandler.uploadFileToRemoteBucket(this.objectKey, this.archiveFilePath)
await this.archivesTrackingService.track({ name: this.objectKey, dataObjectIds: dataObjectIds })
await this.logUploadStats(startTime)
await this.cleanup(dataObjectIds)
logger.info(`${this.archiveFilePath} successfully uploaded to S3!`)
} catch (e) {
Expand All @@ -133,15 +178,20 @@ export class CompressAndUploadTask implements Task {
private compressTask: CompressFilesTask
private uploadTask: UploadArchiveFileTask

// eslint-disable-next-line no-useless-constructor
constructor(
private uploadsDirectory: string,
private dataObjectIds: string[],
private archivesTrackingService: ArchivesTrackingService,
private connectionHandler: IConnectionHandler<StorageClass>,
private compressionService: CompressionService
private compressionService: CompressionService,
private statsCollectingService: StatsCollectingService
) {
this.compressTask = new CompressFilesTask(this.uploadsDirectory, this.dataObjectIds, this.compressionService)
this.compressTask = new CompressFilesTask(
this.uploadsDirectory,
this.dataObjectIds,
this.compressionService,
this.statsCollectingService
)
this.archiveFilePath = this.compressTask.getArchiveFilePath()
this.archiveFileName = path.basename(this.archiveFilePath)
this.uploadTask = new UploadArchiveFileTask(
Expand All @@ -150,7 +200,9 @@ export class CompressAndUploadTask implements Task {
this.uploadsDirectory,
this.archivesTrackingService,
this.connectionHandler,
this.compressionService
this.compressionService,
this.statsCollectingService,
this.dataObjectIds
)
}

Expand Down
Loading

0 comments on commit 7b7bf26

Please sign in to comment.