Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔑 feat: Implement TTL Mgmt. for In-Memory Keyv Stores #5127

Merged
merged 1 commit into from
Dec 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 162 additions & 12 deletions api/cache/getLogStores.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,51 @@ const { math, isEnabled } = require('~/server/utils');
const keyvRedis = require('./keyvRedis');
const keyvMongo = require('./keyvMongo');

const { BAN_DURATION, USE_REDIS } = process.env ?? {};
const { BAN_DURATION, USE_REDIS, DEBUG_MEMORY_CACHE } = process.env ?? {};

const duration = math(BAN_DURATION, 7200000);
const isRedisEnabled = isEnabled(USE_REDIS);
const debugMemoryCache = isEnabled(DEBUG_MEMORY_CACHE);

const createViolationInstance = (namespace) => {
const config = isEnabled(USE_REDIS) ? { store: keyvRedis } : { store: violationFile, namespace };
const config = isRedisEnabled ? { store: keyvRedis } : { store: violationFile, namespace };
return new Keyv(config);
};

// Serve cache from memory so no need to clear it on startup/exit
const pending_req = isEnabled(USE_REDIS)
const pending_req = isRedisEnabled
? new Keyv({ store: keyvRedis })
: new Keyv({ namespace: 'pending_req' });

const config = isEnabled(USE_REDIS)
const config = isRedisEnabled
? new Keyv({ store: keyvRedis })
: new Keyv({ namespace: CacheKeys.CONFIG_STORE });

const roles = isEnabled(USE_REDIS)
const roles = isRedisEnabled
? new Keyv({ store: keyvRedis })
: new Keyv({ namespace: CacheKeys.ROLES });

const audioRuns = isEnabled(USE_REDIS)
const audioRuns = isRedisEnabled
? new Keyv({ store: keyvRedis, ttl: Time.TEN_MINUTES })
: new Keyv({ namespace: CacheKeys.AUDIO_RUNS, ttl: Time.TEN_MINUTES });

const messages = isEnabled(USE_REDIS)
? new Keyv({ store: keyvRedis, ttl: Time.FIVE_MINUTES })
: new Keyv({ namespace: CacheKeys.MESSAGES, ttl: Time.FIVE_MINUTES });
const messages = isRedisEnabled
? new Keyv({ store: keyvRedis, ttl: Time.ONE_MINUTE })
: new Keyv({ namespace: CacheKeys.MESSAGES, ttl: Time.ONE_MINUTE });

const tokenConfig = isEnabled(USE_REDIS)
const tokenConfig = isRedisEnabled
? new Keyv({ store: keyvRedis, ttl: Time.THIRTY_MINUTES })
: new Keyv({ namespace: CacheKeys.TOKEN_CONFIG, ttl: Time.THIRTY_MINUTES });

const genTitle = isEnabled(USE_REDIS)
const genTitle = isRedisEnabled
? new Keyv({ store: keyvRedis, ttl: Time.TWO_MINUTES })
: new Keyv({ namespace: CacheKeys.GEN_TITLE, ttl: Time.TWO_MINUTES });

const modelQueries = isEnabled(process.env.USE_REDIS)
? new Keyv({ store: keyvRedis })
: new Keyv({ namespace: CacheKeys.MODEL_QUERIES });

const abortKeys = isEnabled(USE_REDIS)
const abortKeys = isRedisEnabled
? new Keyv({ store: keyvRedis })
: new Keyv({ namespace: CacheKeys.ABORT_KEYS, ttl: Time.TEN_MINUTES });

Expand Down Expand Up @@ -88,6 +90,154 @@ const namespaces = {
[CacheKeys.MESSAGES]: messages,
};

/**
* Gets all cache stores that have TTL configured
* @returns {Keyv[]}
*/
function getTTLStores() {
return Object.values(namespaces).filter((store) =>
store instanceof Keyv &&
typeof store.opts?.ttl === 'number' &&
store.opts.ttl > 0,
);
}

/**
* Clears entries older than the cache's TTL
* @param {Keyv} cache
*/
async function clearExpiredFromCache(cache) {
if (!cache?.opts?.store?.entries) {
return;
}

const ttl = cache.opts.ttl;
if (!ttl) {
return;
}

const expiryTime = Date.now() - ttl;
let cleared = 0;

// Get all keys first to avoid modification during iteration
const keys = Array.from(cache.opts.store.keys());

for (const key of keys) {
try {
const raw = cache.opts.store.get(key);
if (!raw) {continue;}

const data = cache.opts.deserialize(raw);
// Check if the entry is older than TTL
if (data?.expires && data.expires <= expiryTime) {
const deleted = await cache.opts.store.delete(key);
if (!deleted) {
debugMemoryCache && console.warn(`[Cache] Error deleting entry: ${key} from ${cache.opts.namespace}`);
continue;
}
cleared++;
}
} catch (error) {
debugMemoryCache && console.log(`[Cache] Error processing entry from ${cache.opts.namespace}:`, error);
const deleted = await cache.opts.store.delete(key);
if (!deleted) {
debugMemoryCache && console.warn(`[Cache] Error deleting entry: ${key} from ${cache.opts.namespace}`);
continue;
}
cleared++;
}
}

if (cleared > 0) {
debugMemoryCache && console.log(`[Cache] Cleared ${cleared} entries older than ${ttl}ms from ${cache.opts.namespace}`);
}
}

const auditCache = () => {
const ttlStores = getTTLStores();
console.log('[Cache] Starting audit');

ttlStores.forEach(store => {
if (!store?.opts?.store?.entries) {
return;
}

console.log(`[Cache] ${store.opts.namespace} entries:`, {
count: store.opts.store.size,
ttl: store.opts.ttl,
keys: Array.from(store.opts.store.keys()),
entriesWithTimestamps: Array.from(store.opts.store.entries())
.map(([key, value]) => ({
key,
value,
})),
});
});
};

/**
* Clears expired entries from all TTL-enabled stores
*/
async function clearAllExpiredFromCache() {
const ttlStores = getTTLStores();
await Promise.all(ttlStores.map(store => clearExpiredFromCache(store)));

// Force garbage collection if available (Node.js with --expose-gc flag)
if (global.gc) {
global.gc();
}
}

if (!isRedisEnabled) {
/** @type {Set<NodeJS.Timeout>} */
const cleanupIntervals = new Set();

// Clear expired entries every 30 seconds
const cleanup = setInterval(() => {
clearAllExpiredFromCache();
}, Time.THIRTY_SECONDS);

cleanupIntervals.add(cleanup);

if (debugMemoryCache) {
const monitor = setInterval(() => {
const ttlStores = getTTLStores();
const memory = process.memoryUsage();
const totalSize = ttlStores.reduce((sum, store) => sum + (store.opts?.store?.size ?? 0), 0);

console.log('[Cache] Memory usage:', {
heapUsed: `${(memory.heapUsed / 1024 / 1024).toFixed(2)} MB`,
heapTotal: `${(memory.heapTotal / 1024 / 1024).toFixed(2)} MB`,
rss: `${(memory.rss / 1024 / 1024).toFixed(2)} MB`,
external: `${(memory.external / 1024 / 1024).toFixed(2)} MB`,
totalCacheEntries: totalSize,
});

auditCache();
}, Time.ONE_MINUTE);

cleanupIntervals.add(monitor);
}

const dispose = () => {
debugMemoryCache && console.log('[Cache] Cleaning up and shutting down...');
cleanupIntervals.forEach(interval => clearInterval(interval));
cleanupIntervals.clear();

// One final cleanup before exit
clearAllExpiredFromCache().then(() => {
debugMemoryCache && console.log('[Cache] Final cleanup completed');
process.exit(0);
});
};

// Handle various termination signals
process.on('SIGTERM', dispose);
process.on('SIGINT', dispose);
process.on('SIGQUIT', dispose);
process.on('SIGHUP', dispose);
}

/**
* Returns the keyv cache specified by type.
* If an invalid type is passed, an error will be thrown.
Expand Down
Loading