Skip to content

Commit

Permalink
Merge pull request #5 from cboar/cluster-support
Browse files Browse the repository at this point in the history
Full cluster support
  • Loading branch information
cboar authored Oct 24, 2022
2 parents ba3cf44 + 4643220 commit 41754e3
Show file tree
Hide file tree
Showing 12 changed files with 2,252 additions and 3,355 deletions.
6 changes: 2 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
dist/
node_modules/
coverage/
.project
.mongodb
start
NOTES
dev/
.project
4,085 changes: 1,013 additions & 3,072 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
"eslint-config-airbnb-typescript": "^17.0.0",
"eslint-plugin-import": "^2.26.0",
"husky": "^8.0.1",
"jest": "^28.0.8",
"jest": "^29.2.1",
"semantic-release": "^19.0.5",
"ts-jest": "^28.0.8",
"ts-jest": "^29.0.3",
"typescript": "^4.8.2"
},
"dependencies": {
Expand Down
98 changes: 60 additions & 38 deletions src/CachedQuery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,35 +89,46 @@ class CachedQuery<
return this.buildQuery(full).countDocuments();
}

/**
* Stringifies the result object and stores it in cache.
*/
private async serializeAndCache(result: T[], cacheKey: string) {
const { redis } = this.context;
const { hash, config: { populate, expiry } } = this;
const { hash, config: { cacheCount, populate, expiry } } = this;
try {
const bson = serialize(result);
const docIds = result.map((doc) => String(doc._id));
const populatedIds = collectPopulatedIds(result, populate);
const allKey = `A:${hash}`;
// Cache result, and create keys used for tracking invalidations

const multi = redis.multi();
multi.del(cacheKey);
multi.hset(cacheKey, 'V', bson);
multi.hset(cacheKey, 'O', docIds.join(' '));
multi.hset(cacheKey, 'P', populatedIds.join(' '));
multi.sadd(allKey, cacheKey);
multi.expiregt(cacheKey, expiry);
multi.expiregt(allKey, expiry);
docIds.forEach((id) => {
multi.sadd(`O:${id}`, cacheKey);
multi.expiregt(`O:${id}`, expiry);
});
populatedIds.forEach((id) => {
multi.sadd(`P:${id}`, cacheKey);
multi.expiregt(`P:${id}`, expiry);
});
await multi.exec();
if (cacheCount === Infinity) {
multi.hset(cacheKey, 'N', result.length);
} else {
multi.hdel(cacheKey, 'N');
}
multi
.hset(cacheKey, 'V', bson)
.hset(cacheKey, 'O', docIds.join(' '))
.hset(cacheKey, 'P', populatedIds.join(' '))
.expiregt(cacheKey, expiry);

await Promise.all([
multi.exec(),
redis.pipeline()
.sadd(allKey, cacheKey)
.expiregt(allKey, expiry)
.exec(),
...docIds.flatMap((id) => (
redis.pipeline()
.sadd(`O:${id}`, cacheKey)
.expiregt(`O:${id}`, expiry)
.exec()
)),
...populatedIds.flatMap((id) => (
redis.pipeline()
.sadd(`P:${id}`, cacheKey)
.expiregt(`P:${id}`, expiry)
.exec()
)),
]);
} catch (err) {
// logger.warn({ err, tag: 'CACHE_REDIS_SET', cacheKey, result, }, 'Failed to set value');
}
Expand Down Expand Up @@ -152,7 +163,7 @@ class CachedQuery<
* unique queries do not invalidate upon document insert, that event
* would not be detected for invalidation. */
if (result.length > 0 || !unique) {
this.serializeAndCache(result, cacheKey);
await this.serializeAndCache(result, cacheKey);
}
}
if (filter) result = result.filter(filter);
Expand All @@ -170,49 +181,60 @@ class CachedQuery<
const { cacheCount } = this.config;
/* If cacheCount is infinity, we know all the documents matching the query
* are already stored on cache. We already have to grab and splice it,
* so we might as well use the array length instead of another lookup.
*/
* so we might as well use the array length instead of another lookup. */
if (cacheCount === Infinity) {
const { skip, limit } = opts.exec;
// note: if applicable, the filter func is already applied to fullResult.
const fullResult = await this.exec(opts.merged({ skip: 0, limit: undefined }));
const result = skipAndLimit(fullResult, skip, limit);
return [
result,
fullResult.length,
];
return [result, fullResult.length];
}
return Promise.all([

const [result, { count, save: saveCount }] = await Promise.all([
this.exec(opts),
this.count(opts),
this.fetchCount(opts),
]);
if (saveCount) await saveCount();
return [result, count];
}

async count(input: InputExecOpts<T, P>) {
const opts = this.parseOpts(input);
const { redis } = this.context;
const { key: cacheKey, exec: { filter, skipCache = false } } = opts;
const { filter } = opts.exec;
if (filter) {
const fullResult = await this.exec(opts.merged({ skip: 0, limit: undefined }));
return fullResult.length;
}
let count;
const { count, save } = await this.fetchCount(opts);
if (save) await save();
return count;
}

/**
* This method is used to allow more flexible timing of when the count is written to redis.
* When used with `count()`, we save right away.
* When used with `execWithCount()`, we first await the completion of `exec`. */
private async fetchCount(input: InputExecOpts<T, P>) {
const opts = this.parseOpts(input);
const { redis } = this.context;
const { key: cacheKey, exec: { skipCache } } = opts;
if (!skipCache) {
try {
count = await redis.hget(cacheKey, 'count');
const count = await redis.hget(cacheKey, 'N');
if (count) return { count: parseInt(count, 10) };
} catch (err) {
// logger.warn({ err, tag: 'CACHE_REDIS_GET', cacheKey }, 'Failed to HGET value');
}
}
if (!count) {
count = await this.countMongo(opts.fresh({}));
const count = await this.countMongo(opts.fresh({}));
async function save() {
try {
await redis.hset(cacheKey, 'count', count);
await redis.hset(cacheKey, 'N', count);
} catch (err) {
// logger.warn({ err, tag: 'CACHE_REDIS_SET', cacheKey }, 'Failed to set value');
}
}
return typeof count === 'number' ? count : parseInt(count, 10);
return { count, save };
}

private parseOpts(input: InputExecOpts<T, P>) {
Expand Down
2 changes: 1 addition & 1 deletion src/CachedQuery/invalidation/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class PopulatedInvalidationInfo {
const { cachedQuery, selectInclusive, selectPaths } = this;
const { after } = doc;
if (wasProjectionModified(selectInclusive, selectPaths, modified)) {
return { set: `P:${String(after._id)}`, filter: cachedQuery.hash };
return { set: `P:${String(after._id)}`, filter: `Q:${cachedQuery.hash}` };
}
return null;
}
Expand Down
49 changes: 33 additions & 16 deletions src/CachedQuery/invalidation/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,45 @@ export default class InvalidationHandler {

async doInvalidations(collected: CollectedInvalidations) {
const { keys, sets } = collected;
if ((!keys || !keys.length) && (!sets || !sets.length)) return;

const { redis } = this.context;
const promises: Promise<string[] | 0 | 1>[] = [];
const promises: Promise<void>[] = [];
if (keys && keys.length) {
promises.push(...keys.map((key) => redis.delQuery(key)));
promises.push(...keys.map((key) => this.delQuery(key)));
}
if (sets && sets.length) {
promises.push(...sets.map(({ set, filter }) => redis.delQueriesIn(set, filter)));
promises.push(...sets.map(({ set, filter }) => this.delQueriesIn(set, filter)));
}
const result = await Promise.allSettled(promises);
await Promise.allSettled(promises);
}

const { keysInvalidated } = this;
result.forEach((settled, idx) => {
if (settled.status === 'rejected') return;
const res = settled.value;
if (res === 1) {
keysInvalidated.push(keys![idx]!);
} else if (Array.isArray(res) && res.length) {
keysInvalidated.push(...res);
}
private async delQuery(qkey: string) {
const { redis } = this.context;
const [[, docIds], [, populatedIds]] = await redis.multi()
.hget(qkey, 'O')
.hget(qkey, 'P')
.del(qkey)
.exec() as [[unknown, string | null], [unknown, string | null]];
if (!docIds) return;
this.keysInvalidated.push(qkey);
const allKey = `A:${qkey.substring(2, 18)}`;
const promises = [
redis.srem(allKey, qkey),
];
docIds.split(' ').forEach((id) => {
promises.push(redis.srem(`O:${id}`, qkey));
});
populatedIds?.split(' ').forEach((id) => {
promises.push(redis.srem(`P:${id}`, qkey));
});
await Promise.all(promises);
}

private async delQueriesIn(setKey: string, filter?: string) {
const { redis } = this.context;
let keys = await redis.smembers(setKey);
if (filter) {
keys = keys.filter((key) => key.startsWith(filter));
}
await Promise.all(keys.map((key) => this.delQuery(key)));
}

private getInvalidationInfos(model: string) {
Expand Down
58 changes: 4 additions & 54 deletions src/mondis.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type Redis from 'ioredis';
import type { Result } from 'ioredis';
import type { Redis, Cluster, Result } from 'ioredis';
import type { Mongoose } from 'mongoose';
import type { CachedQueryConfig } from './CachedQuery/config';
import CachedQuery from './CachedQuery';
Expand All @@ -16,7 +15,7 @@ declare module 'ioredis' {
}

type MondisConfiguration<Q> = {
redis?: Redis;
redis?: Redis | Cluster;
mongoose?: Mongoose;
queries?: Q;
};
Expand All @@ -42,55 +41,6 @@ const commands = {
end
`,
},

delQuery: {
numberOfKeys: 1,
lua: `
local qkey = KEYS[1]
local docIds = redis.call("HGET", qkey, "O")
if docIds == false then
return 0 end
for key in string.gmatch(docIds, "%S+") do
redis.call("SREM", "O:"..key, qkey)
end
local populatedIds = redis.call("HGET", qkey, "P")
for key in string.gmatch(populatedIds, "%S+") do
redis.call("SREM", "P:"..key, qkey)
end
local allKey = "A:"..string.sub(qkey, 3, 18)
redis.call("SREM", allKey, qkey)
redis.call("DEL", qkey)
return 1
`,
},

delQueriesIn: {
numberOfKeys: 1,
lua: `
local result = {}
local filter = ARGV[1] and ("^Q:"..ARGV[1])
local keys = redis.call("SMEMBERS", KEYS[1])
for _, qkey in ipairs(keys) do
if filter == nil or string.find(qkey, filter) ~= nil then
local docIds = redis.call("HGET", qkey, "O")
if docIds ~= false then
for key in string.gmatch(docIds, "%S+") do
redis.call("SREM", "O:"..key, qkey)
end
local populatedIds = redis.call("HGET", qkey, "P")
for key in string.gmatch(populatedIds, "%S+") do
redis.call("SREM", "P:"..key, qkey)
end
local allKey = "A:"..string.sub(qkey, 3, 18)
redis.call("SREM", allKey, qkey)
redis.call("DEL", qkey)
table.insert(result, qkey)
end
end
end
return result
`,
},
};

function buildCachedQueryMap<Q>(mondis: Mondis, input: unknown) {
Expand All @@ -104,7 +54,7 @@ function buildCachedQueryMap<Q>(mondis: Mondis, input: unknown) {
}

class Mondis<Q = {}> {
private _redis?: Redis;
private _redis?: Redis | Cluster;
private _mongoose?: Mongoose;
readonly invalidator: InvalidationHandler;
readonly rehydrator: RehydrationHandler;
Expand All @@ -117,7 +67,7 @@ class Mondis<Q = {}> {
this.init(config);
}

init(clients: { redis?: Redis, mongoose?: Mongoose }) {
init(clients: { redis?: Redis | Cluster, mongoose?: Mongoose }) {
const { redis, mongoose } = clients;
if (redis) {
Object.entries(commands).forEach(([name, conf]) => {
Expand Down
Loading

0 comments on commit 41754e3

Please sign in to comment.