diff --git a/optimizer.config.example.js b/optimizer.config.example.js index 52f8f0d..da0598a 100644 --- a/optimizer.config.example.js +++ b/optimizer.config.example.js @@ -15,9 +15,6 @@ module.exports = { db: 0, // password: '123456', }, - revalidate: 300, - ttl: 24 * 60 * 60, - cleanSchedule: '0 2 * * *', urlParser: (url) => url, log: { accessLog: 'stdout', diff --git a/package.json b/package.json index c0e71d1..4b5d23e 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,6 @@ "monocle-ts": "^2.3.13", "morgan": "^1.10.0", "newtype-ts": "^0.3.5", - "node-cron": "^3.0.2", "object-hash": "^3.0.0", "sharp": "^0.32.6" }, diff --git a/src/bootstrap.ts b/src/bootstrap.ts index 18757e6..91d29f9 100644 --- a/src/bootstrap.ts +++ b/src/bootstrap.ts @@ -11,4 +11,3 @@ if (!fs.existsSync(ffmpegPath) || !fs.existsSync(ffprobePath)) { } require('./server') -require('./task') diff --git a/src/lib/cache.ts b/src/lib/cache.ts index 7616e54..d197046 100644 --- a/src/lib/cache.ts +++ b/src/lib/cache.ts @@ -11,6 +11,7 @@ import { CachaParams } from '@/types' import { Locker } from './locker' import redisClient from './redis' +import { delay } from './utils' const logger = Logger.get('cache') const getCacheFilePath = (hash: string) => { @@ -25,22 +26,11 @@ export class Cache { this.key = `image_cache:${hash(params)}` this.cacheLocker = new Locker(params) } - get = async (): Promise<[null] | [string, boolean]> => { + get = async (): Promise<[null] | [string, number]> => { const cached = await redisClient.hgetall(this.key) - const { timestamp, file } = cached - if (!timestamp || !file) return [null] - if (Date.now() - parseInt(timestamp) > config.ttl * 1000) { - redisClient.del(this.key) - fs.unlink(getCacheFilePath(file), () => undefined) - return [null] - } - const revalidate = - Date.now() - parseInt(timestamp) > config.revalidate * 1000 - if (!fs.existsSync(getCacheFilePath(file))) { - redisClient.del(this.key) - return [null] - } - return [getCacheFilePath(file), revalidate] + const { file, timestamp } = cached + if (!file) return [null] + return [getCacheFilePath(file), Date.now() - parseInt(timestamp)] } set = (data: PassThrough) => @@ -61,7 +51,6 @@ export class Cache { }, ) try { - const prevCached = await redisClient.hgetall(this.key) await Promise.all([ redisClient.hset(this.key, { file: fileHash, @@ -70,9 +59,6 @@ export class Cache { fsPromise.writeFile(getCacheFilePath(fileHash), data), ]) resolve() - if (prevCached && prevCached.file && prevCached.file !== fileHash) { - fs.unlink(getCacheFilePath(prevCached.file), () => undefined) - } } catch (err) { logger.error('Error while create image cache: ', err) reject(err) @@ -81,20 +67,49 @@ export class Cache { }) } -export const clean = async () => { - logger.time('clean cache cost') - const keys = await redisClient.keys('image_cache:*') - for (const key of keys) { - const cached = await redisClient.hgetall(key) - const { timestamp, file } = cached - if (!timestamp || !file) { - redisClient.del(key) - continue +type CacheStatus = 'hit' | 'miss' +export const getWithCache = async (options: { + cacheKey: any + fetcher: () => Promise<[string] | [null, PassThrough]> + callback: ( + cacheStatus: CacheStatus, + cachePath: string, + age: number, + ) => Promise +}) => { + const { cacheKey, fetcher, callback } = options + const cacheLocker = new Locker(cacheKey) + const cache = new Cache(cacheKey) + + const [cached, age] = await cache.get() + + const update = async () => { + if (await cacheLocker.isLocked()) return + const start = Date.now() + // cache miss + await cacheLocker.lock() + try { + const [error, data] = await fetcher() + if (error) throw new Error(error) + await cache.set(data) + logger.info(`process cost: ${Date.now() - start}ms`) + } catch (err) { + throw err + } finally { + await cacheLocker.unlock() } - if (Date.now() - parseInt(timestamp) > config.ttl * 1000) { - redisClient.del(key) - fs.unlink(getCacheFilePath(file), (err) => logger.error(err)) + } + + if (cached) { + // Cache hit + return callback('hit', cached, age) + } else { + update() + await delay(10) + while (await cacheLocker.isLocked()) { + await delay(10) } + const [cached, age] = await cache.get() + return callback('miss', cached, age) } - logger.timeEnd('clean cache cost') } diff --git a/src/lib/cacheWithRevalidation.ts b/src/lib/cacheWithRevalidation.ts deleted file mode 100644 index 7449060..0000000 --- a/src/lib/cacheWithRevalidation.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { PassThrough } from 'stream' - -import Logger from '@/lib/logger' - -import { Cache } from './cache' -import { Locker } from './locker' -import { delay } from './utils' - -const logger = Logger.get('cache manager') - -type CacheStatus = 'hit' | 'miss' | 'revalidate' -export const cacheWithRevalidation = async (options: { - cacheKey: any - revalidate: () => Promise<[string] | [null, PassThrough]> - callback: (cacheStatus: CacheStatus, cachePath: string) => Promise -}) => { - const { cacheKey, revalidate, callback } = options - const cacheLocker = new Locker(cacheKey) - const cache = new Cache(cacheKey) - - const [cached, needRevalidate] = await cache.get() - - const update = async () => { - if (await cacheLocker.isLocked()) return - const start = Date.now() - // revalidate or cache miss - await cacheLocker.lock() - try { - const [error, data] = await revalidate() - if (error) throw new Error(error) - await cache.set(data) - logger.info(`Updated cost: ${Date.now() - start}ms`) - } catch (err) { - throw err - } finally { - await cacheLocker.unlock() - } - } - - if (cached && !needRevalidate) { - // Cache hit and not revalidate - return callback('hit', cached) - } else if (cached && needRevalidate) { - // Cache hit and revalidate but has a running task - await callback('revalidate', cached) - return update() - } else { - // Cache miss but has a running task - update() - await delay(10) - while (await cacheLocker.isLocked()) { - await delay(10) - } - const [cached] = await cache.get() - return callback('miss', cached) - } -} diff --git a/src/server.ts b/src/server/index.ts similarity index 97% rename from src/server.ts rename to src/server/index.ts index 14dc41c..1157f54 100644 --- a/src/server.ts +++ b/src/server/index.ts @@ -2,11 +2,11 @@ import express from 'express' import fs from 'fs' import morgan from 'morgan' +import { config } from '@/lib/config' import Logger from '@/lib/logger' import * as Sentry from '@sentry/node' import * as Tracing from '@sentry/tracing' -import { config } from './lib/config' import animationRouter from './routes/animation' import imageRouter from './routes/image' diff --git a/src/routes/animation.ts b/src/server/routes/animation.ts similarity index 91% rename from src/routes/animation.ts rename to src/server/routes/animation.ts index 782f1b8..07709d5 100644 --- a/src/routes/animation.ts +++ b/src/server/routes/animation.ts @@ -4,7 +4,7 @@ import fs from 'fs' import { CreateReadStreamOptions } from 'fs/promises' import { PassThrough } from 'node:stream' -import { cacheWithRevalidation } from '@/lib/cacheWithRevalidation' +import { getWithCache } from '@/lib/cache' import { config } from '@/lib/config' import { D, E } from '@/lib/fp' import Logger from '@/lib/logger' @@ -13,11 +13,13 @@ const logger = Logger.get('animation optimize') const router = Router() +const videoTaskParamsDecoder = D.struct({ + url: D.string, + format: D.literal('mp4', 'webm'), +}) + const parseQuery = (query: any) => { - const params = D.struct({ - url: D.string, - format: D.literal('mp4', 'webm'), - }).decode(query) + const params = videoTaskParamsDecoder.decode(query) if (E.isLeft(params)) { return null } @@ -69,10 +71,10 @@ router.head('/', async (req, res) => { const cacheKey = { url, format } try { - await cacheWithRevalidation({ + await getWithCache({ cacheKey, - revalidate: revalidate(videoUrl.toString(), format), - callback: (cacheStatus, cachePath) => + fetcher: revalidate(videoUrl.toString(), format), + callback: (cacheStatus, cachePath, age) => new Promise((resolve) => { const stat = fs.statSync(cachePath) res.writeHead(200, { @@ -81,6 +83,7 @@ router.head('/', async (req, res) => { 'Content-Type': `video/${format}`, 'Cache-Control': 'public, max-age=31536000, must-revalidate', 'x-image-cache': cacheStatus.toUpperCase(), + age: `${age}`, }) res.end() logger.info(`[${cacheStatus.toUpperCase()}] ${url}, format:${format}`) @@ -114,9 +117,9 @@ router.get('/', async (req, res) => { const cacheKey = { url, format } try { - await cacheWithRevalidation({ + await getWithCache({ cacheKey, - revalidate: revalidate(videoUrl.toString(), format), + fetcher: revalidate(videoUrl.toString(), format), callback: (cacheStatus, cachePath) => new Promise((resolve) => { logger.info(`[${cacheStatus.toUpperCase()}] ${url}, format:${format}`) diff --git a/src/routes/image.ts b/src/server/routes/image.ts similarity index 70% rename from src/routes/image.ts rename to src/server/routes/image.ts index ff78290..c752232 100644 --- a/src/routes/image.ts +++ b/src/server/routes/image.ts @@ -5,30 +5,59 @@ import { NumberFromString } from 'io-ts-types' import { PassThrough } from 'node:stream' import { returnOriginalFormats, supportedFormats, supportedTargetFormats } from '@/consts' -import { cacheWithRevalidation } from '@/lib/cacheWithRevalidation' +import { getWithCache } from '@/lib/cache' import { config } from '@/lib/config' -import { D, O } from '@/lib/fp' +import { D, E, O } from '@/lib/fp' import http from '@/lib/http' import Logger from '@/lib/logger' import { optimizeImage } from '@/lib/optimizer' const logger = Logger.get('image optimize') -export const paramsDecoder = (params: any) => ({ - url: pipe(D.string.decode(params.url), O.fromEither, O.toUndefined), - width: pipe(NumberFromString.decode(params.w), O.fromEither, O.toUndefined), - height: pipe(NumberFromString.decode(params.h), O.fromEither, O.toUndefined), - quality: pipe( - NumberFromString.decode(params.q), - O.fromEither, - O.getOrElse(() => 75), - ), -}) +const paramsDecoder = (params: any) => + pipe( + pipe( + D.struct({ + url: D.string, + }), + D.intersect( + D.partial({ + w: D.string, + h: D.string, + q: D.string, + }), + ), + ).decode(params), + E.map((params) => ({ + url: pipe(O.some(params.url), O.toUndefined), + width: pipe( + NumberFromString.decode(params.w), + O.fromEither, + O.toUndefined, + ), + height: pipe( + NumberFromString.decode(params.h), + O.fromEither, + O.toUndefined, + ), + quality: pipe( + NumberFromString.decode(params.q), + O.fromEither, + O.getOrElse(() => 75), + ), + })), + ) const router = Router() router.get('/', async (req, res) => { const { query, headers } = req - const params = paramsDecoder(query) + const _resp = paramsDecoder(query) + if (E.isLeft(_resp)) { + res.writeHead(400) + return res.end('Bad Input') + } + const params = _resp.right + if (!params.url) { res.writeHead(400) return res.end('Missing url parameter') @@ -61,9 +90,9 @@ router.get('/', async (req, res) => { const cacheKey = { ...params, targetFormat } try { - await cacheWithRevalidation({ + await getWithCache({ cacheKey, - revalidate: async () => { + async fetcher() { const { data, headers: imageHeaders } = await http.get( config.urlParser(imageUrl.toString()), { @@ -89,12 +118,13 @@ router.get('/', async (req, res) => { const stream = transformer.pipe(new PassThrough()) return [null, stream] }, - callback: (cacheStatus, cachePath) => - new Promise((resolve) => { + callback(cacheStatus, cachePath, age) { + return new Promise((resolve) => { res.writeHead(200, { 'Content-Type': targetFormat, 'Cache-Control': 'public, max-age=31536000, must-revalidate', 'x-image-cache': cacheStatus.toUpperCase(), + age: `${age}`, }) logger.info( `[${cacheStatus.toUpperCase()}] ${params.url}, W:${ @@ -103,8 +133,12 @@ router.get('/', async (req, res) => { ) const data = fs.createReadStream(cachePath) data.pipe(res) - data.on('end', resolve) - }), + data.on('end', () => { + res.end() + resolve() + }) + }) + }, }) } catch (err) { logger.error( diff --git a/src/task.ts b/src/task.ts deleted file mode 100644 index 7f3d810..0000000 --- a/src/task.ts +++ /dev/null @@ -1,13 +0,0 @@ -import cron from 'node-cron' - -import Logger from '@/lib/logger' - -import { clean } from './lib/cache' -import { config } from './lib/config' - -const logger = Logger.get('tasks') -cron.schedule(config.cleanSchedule, async () => { - logger.info('Start clean cache...') - await clean() - logger.info('Clean cache done.') -}) diff --git a/yarn.lock b/yarn.lock index cad0447..2013463 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2619,7 +2619,6 @@ __metadata: monocle-ts: "npm:^2.3.13" morgan: "npm:^1.10.0" newtype-ts: "npm:^0.3.5" - node-cron: "npm:^3.0.2" nodemon: "npm:^2.0.19" object-hash: "npm:^3.0.0" prettier: "npm:^2.8.3" @@ -3390,15 +3389,6 @@ __metadata: languageName: node linkType: hard -"node-cron@npm:^3.0.2": - version: 3.0.2 - resolution: "node-cron@npm:3.0.2" - dependencies: - uuid: "npm:8.3.2" - checksum: 9b67d817ec5cb373a7b5676b5984b1c9624b40765d939cb3e7072f5c7ed6133100cf90824118ddc87edc25b632d4457b3843384cbf323a35562634b0a8343c05 - languageName: node - linkType: hard - "node-gyp@npm:latest": version: 10.0.1 resolution: "node-gyp@npm:10.0.1" @@ -4921,15 +4911,6 @@ __metadata: languageName: node linkType: hard -"uuid@npm:8.3.2": - version: 8.3.2 - resolution: "uuid@npm:8.3.2" - bin: - uuid: dist/bin/uuid - checksum: bcbb807a917d374a49f475fae2e87fdca7da5e5530820ef53f65ba1d12131bd81a92ecf259cc7ce317cbe0f289e7d79fdfebcef9bfa3087c8c8a2fa304c9be54 - languageName: node - linkType: hard - "v8-compile-cache-lib@npm:^3.0.1": version: 3.0.1 resolution: "v8-compile-cache-lib@npm:3.0.1"