diff --git a/apps/platform/src/config/queue.ts b/apps/platform/src/config/queue.ts index fb1f7a2f..429b014f 100644 --- a/apps/platform/src/config/queue.ts +++ b/apps/platform/src/config/queue.ts @@ -26,6 +26,7 @@ import UpdateJourneysJob from '../journey/UpdateJourneysJob' import ScheduledEntranceJob from '../journey/ScheduledEntranceJob' import ScheduledEntranceOrchestratorJob from '../journey/ScheduledEntranceOrchestratorJob' import ListRefreshJob from '../lists/ListRefreshJob' +import ListEvaluateUserJob from '../lists/ListEvaluateUserJob' export const jobs = [ CampaignGenerateListJob, @@ -38,6 +39,7 @@ export const jobs = [ JourneyDelayJob, JourneyProcessJob, JourneyStatsJob, + ListEvaluateUserJob, ListRefreshJob, ListPopulateJob, ListStatsJob, diff --git a/apps/platform/src/config/redis.ts b/apps/platform/src/config/redis.ts index 121d5998..a3d00a32 100644 --- a/apps/platform/src/config/redis.ts +++ b/apps/platform/src/config/redis.ts @@ -35,17 +35,19 @@ export const cacheDel = async (redis: Redis, key: string) => { } export const cacheIncr = async (redis: Redis, key: string, incr = 1, ttl?: number) => { - await redis.incrby(key, incr) + const val = await redis.incrby(key, incr) if (ttl) { await redis.expire(key, ttl) } + return val } export const cacheDecr = async (redis: Redis, key: string, ttl?: number) => { - await redis.decr(key) + const val = await redis.decr(key) if (ttl) { await redis.expire(key, ttl) } + return val } export { Redis } diff --git a/apps/platform/src/lists/List.ts b/apps/platform/src/lists/List.ts index 78ddfdbf..232f5f2c 100644 --- a/apps/platform/src/lists/List.ts +++ b/apps/platform/src/lists/List.ts @@ -17,9 +17,15 @@ export default class List extends Model { is_visible!: boolean refreshed_at?: Date | null deleted_at?: Date + progress?: ListProgress } -export type DynamicList = List & { rule: RuleTree } +export type ListProgress = { + complete: number + total: number +} + +export type DynamicList = List & { rule_id: number, rule: RuleTree } export class UserList extends Model { user_id!: number diff --git a/apps/platform/src/lists/ListController.ts b/apps/platform/src/lists/ListController.ts index 6904b89a..c25a65fd 100644 --- a/apps/platform/src/lists/ListController.ts +++ b/apps/platform/src/lists/ListController.ts @@ -18,7 +18,7 @@ router.use(projectRoleMiddleware('editor')) router.get('/', async ctx => { const searchSchema = SearchSchema('listUserSearchSchema', { - sort: 'id', + sort: 'updated_at', direction: 'desc', }) const params = extractQueryParams(ctx.query, searchSchema) diff --git a/apps/platform/src/lists/ListEvaluateUserJob.ts b/apps/platform/src/lists/ListEvaluateUserJob.ts new file mode 100644 index 00000000..d1071164 --- /dev/null +++ b/apps/platform/src/lists/ListEvaluateUserJob.ts @@ -0,0 +1,49 @@ +import App from '../app' +import { cacheIncr } from '../config/redis' +import { Job } from '../queue' +import { getUser } from '../users/UserRepository' +import { DynamicList } from './List' +import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService' + +interface ListEvaluateUserParams { + listId: number + userId: number + projectId: number + version: number + totalCount: number +} + +export default class ListEvaluateUserJob extends Job { + static $name = 'list_evaluate_user_job' + + static from(params: ListEvaluateUserParams): ListEvaluateUserJob { + return new this(params) + } + + static async handler({ listId, userId, projectId, version, totalCount }: ListEvaluateUserParams) { + + const list = await getList(listId, projectId) as DynamicList + if (!list) return + + // Check to see if we are still evaluating the latest + // version of the list ruleset + if (list.version !== version) return + + const evaluate = async () => { + const user = await getUser(userId, projectId) + if (!user) return + await evaluateUserList(user, list) + } + + // No matter what always increment the progress + try { + await evaluate() + } finally { + const cacheKey = CacheKeys.populationProgress(list) + const count = await cacheIncr(App.main.redis, cacheKey, 1, 300) + if (count >= totalCount) { + await cleanupList(list) + } + } + } +} diff --git a/apps/platform/src/lists/ListPopulateJob.ts b/apps/platform/src/lists/ListPopulateJob.ts index d792e894..615a8b25 100644 --- a/apps/platform/src/lists/ListPopulateJob.ts +++ b/apps/platform/src/lists/ListPopulateJob.ts @@ -1,7 +1,6 @@ import { Job } from '../queue' import { DynamicList } from './List' import { getList, populateList } from './ListService' -import ListStatsJob from './ListStatsJob' interface ListPopulateParams { listId: number @@ -21,7 +20,5 @@ export default class ListPopulateJob extends Job { if (!list) return await populateList(list) - - await ListStatsJob.from(listId, projectId, true).queue() } } diff --git a/apps/platform/src/lists/ListService.ts b/apps/platform/src/lists/ListService.ts index a6b806c1..652336db 100644 --- a/apps/platform/src/lists/ListService.ts +++ b/apps/platform/src/lists/ListService.ts @@ -1,7 +1,7 @@ import { UserEvent } from '../users/UserEvent' import { User } from '../users/User' import { check } from '../rules/RuleEngine' -import List, { DynamicList, ListCreateParams, ListUpdateParams, UserList } from './List' +import List, { DynamicList, ListCreateParams, ListProgress, ListUpdateParams, UserList } from './List' import Rule, { RuleEvaluation, RuleTree } from '../rules/Rule' import { PageParams } from '../core/searchParams' import ListPopulateJob from './ListPopulateJob' @@ -12,10 +12,18 @@ import { Chunker } from '../utilities' import { getUserEventsForRules } from '../users/UserRepository' import { DateRuleTypes, RuleResults, RuleWithEvaluationResult, checkRules, decompileRule, fetchAndCompileRule, getDateRuleType, mergeInsertRules, splitRuleTree } from '../rules/RuleService' import { updateCampaignSendEnrollment } from '../campaigns/CampaignService' -import { cacheDecr, cacheIncr } from '../config/redis' +import { cacheDecr, cacheDel, cacheGet, cacheIncr, cacheSet } from '../config/redis' import App from '../app' import { RequestError } from '../core/errors' import RuleError from '../rules/RuleError' +import ListEvaluateUserJob from './ListEvaluateUserJob' +import ListStatsJob from './ListStatsJob' + +export const CacheKeys = { + memberCount: (list: List) => `list:${list.id}:${list.version}:count`, + populationProgress: (list: List) => `list:${list.id}:${list.version}:progress`, + populationTotal: (list: Pick) => `list:${list.id}:${list.version}:total`, +} export const pagedLists = async (params: PageParams, projectId: number) => { const result = await List.search( @@ -61,6 +69,9 @@ export const getList = async (id: number, projectId: number) => { if (list) { list.tags = await getTags(List.tableName, [list.id]).then(m => m.get(list.id)) if (list.rule_id) list.rule = await fetchAndCompileRule(list.rule_id) + if (list.state === 'loading') { + list.progress = await populationProgress(list) + } } return list } @@ -170,7 +181,10 @@ export const updateList = async (list: List, { tags, rule, published, ...params } // Start repopulation of the list if state is published - if (list.state !== 'draft') await ListPopulateJob.from(list.id, list.project_id).queue() + if (list.state !== 'draft') { + await updateListState(list.id, { state: 'loading' }) + await ListPopulateJob.from(list.id, list.project_id).queue() + } } return await getList(list.id, list.project_id) @@ -185,8 +199,6 @@ export const deleteList = async (id: number, projectId: number) => { return await List.deleteById(id, qb => qb.where('project_id', projectId)) } -export const countKey = (list: List) => `list:${list.id}:${list.version}:count` - export const addUserToList = async (user: User | number, list: List, event?: UserEvent) => { const userId = user instanceof User ? user.id : user const resp = await UserList.query() @@ -198,7 +210,7 @@ export const addUserToList = async (user: User | number, list: List, event?: Use }) .onConflict(['user_id', 'list_id']) .ignore() - if (resp?.[0]) await cacheIncr(App.main.redis, countKey(list)) + if (resp?.[0]) await cacheIncr(App.main.redis, CacheKeys.memberCount(list)) return resp } @@ -208,7 +220,7 @@ export const removeUserFromList = async (user: User | number, list: List) => { qb.where('user_id', userId) .where('list_id', list.id), ) - if (count) await cacheDecr(App.main.redis, countKey(list)) + if (count) await cacheDecr(App.main.redis, CacheKeys.memberCount(list)) return count } @@ -283,59 +295,102 @@ const scrollUserListForEvaluation = async ({ } } -export const populateList = async (list: List) => { - const { id, version: oldVersion = 0 } = list - const version = oldVersion + 1 - await updateListState(id, { state: 'loading', version }) +export const evaluateUserList = async (user: User, list: DynamicList) => { + const rule = await fetchAndCompileRule(list.rule_id) as RuleTree + const { eventRules, userRules } = splitRuleTree(rule) - const scroll = User.scroll(q => q.where('project_id', list.project_id)) + const parts: RuleWithEvaluationResult[] = [] + const events = await getUserEventsForRules([user.id], eventRules) - // Collect matching user ids, insert in batches of 100 - const userChunker = new Chunker(async userIds => { - await UserList.query() - .insert(userIds.map(user_id => ({ - list_id: list.id, - user_id, - version, - }))) - .onConflict(['user_id', 'list_id']) - .merge(['version']) - }, 100) + for (const rule of eventRules) { + const result = check({ + user: user.flatten(), + events: events.map(e => e.flatten()), + }, rule) - // Collect rule evaluations, insert in batches of 100 - const ruleChunker = new Chunker>(async items => { await RuleEvaluation.query() - .insert(items.map(({ user_id, rule_id, result }) => ({ - user_id, - rule_id, + .insert({ + rule_id: rule.id!, + user_id: user.id, result, - }))) + }) .onConflict(['user_id', 'rule_id']) .merge(['result']) + + parts.push({ + ...rule, + result, + }) + } + + const result = checkRules(user, rule, [...parts, ...userRules]) + + if (result) { + await UserList.query() + .insert({ + list_id: list.id, + user_id: user.id, + version: list.version, + }) + .onConflict(['user_id', 'list_id']) + .merge(['version']) + } +} + +export const populateList = async (list: List) => { + const { id, version: oldVersion = 0 } = list + const version = oldVersion + 1 + list = await updateListState(id, { state: 'loading', version }) + + // Set the total in cache so we can start to calculate progress + const count = await User.count(q => q.where('project_id', list.project_id)) + await cacheSet(App.main.redis, CacheKeys.populationTotal(list), count, 86400) + + const stream = User.query() + .where('project_id', list.project_id) + .stream() + + // Enqueue batches of 100 jobs at a time for efficiency + const userChunker = new Chunker(async jobs => { + await App.main.queue.enqueueBatch(jobs) }, 100) - await scrollUserListForEvaluation({ - list, - scroll, - handleRule: async ({ rule_id, user_id, result }) => { - await ruleChunker.add({ rule_id, user_id, result }) - }, - handleEntry: async (user, result) => { - if (result) await userChunker.add(user.id) - }, - }) + for await (const user of stream) { + await userChunker.add( + ListEvaluateUserJob.from({ + listId: list.id, + userId: user.id, + projectId: list.project_id, + version, + totalCount: count, + }), + ) + } - // Insert any remaining chunks - await ruleChunker.flush() await userChunker.flush() +} + +export const populationProgress = async (list: List): Promise => { + return { + complete: await cacheGet(App.main.redis, CacheKeys.populationProgress(list)) ?? 0, + total: await cacheGet(App.main.redis, CacheKeys.populationTotal(list)) ?? 0, + } +} + +export const cleanupList = async ({ id, project_id, version }: List) => { // Once list is regenerated, drop any users from previous version await UserList.delete(qb => qb .where('version', '<', version) - .where('list_id', list.id)) + .where('list_id', id)) // Update list status to ready await updateListState(id, { state: 'ready' }) + + // Clear cache values + await cacheDel(App.main.redis, CacheKeys.populationTotal({ id, version })) + + await ListStatsJob.from(id, project_id, true).queue() } export const refreshList = async (list: List, types: DateRuleTypes) => { diff --git a/apps/platform/src/lists/ListStatsJob.ts b/apps/platform/src/lists/ListStatsJob.ts index a8360c48..daf42b50 100644 --- a/apps/platform/src/lists/ListStatsJob.ts +++ b/apps/platform/src/lists/ListStatsJob.ts @@ -1,7 +1,8 @@ import App from '../app' import { cacheDel, cacheGet, cacheIncr } from '../config/redis' import { Job } from '../queue' -import { countKey, getList, listUserCount, updateListState } from './ListService' +import List from './List' +import { CacheKeys, getList, listUserCount } from './ListService' interface ListStatsParams { listId: number @@ -26,7 +27,7 @@ export default class ListStatsJob extends Job { if (!list) return const redis = App.main.redis - const cacheKey = countKey(list) + const cacheKey = CacheKeys.memberCount(list) let count = await cacheGet(redis, cacheKey) ?? 0 if (!list?.users_count || reset) { @@ -36,6 +37,8 @@ export default class ListStatsJob extends Job { } // Update the list with the new totals - await updateListState(list.id, { users_count: count }) + await List.query() + .update({ users_count: count }) + .where('id', listId) } } diff --git a/apps/platform/src/queue/Queue.ts b/apps/platform/src/queue/Queue.ts index 1c179990..1238fa71 100644 --- a/apps/platform/src/queue/Queue.ts +++ b/apps/platform/src/queue/Queue.ts @@ -43,7 +43,7 @@ export default class Queue { } async enqueue(job: Job | EncodedJob): Promise { - logger.info(job instanceof Job ? job.toJSON() : job, 'queue:job:enqueued') + logger.trace(job instanceof Job ? job.toJSON() : job, 'queue:job:enqueued') await this.provider.enqueue(job) // Increment stats @@ -51,7 +51,7 @@ export default class Queue { } async enqueueBatch(jobs: EncodedJob[]): Promise { - logger.info({ count: jobs.length }, 'queue:job:enqueuedBatch') + logger.trace({ count: jobs.length }, 'queue:job:enqueuedBatch') await this.provider.enqueueBatch(jobs) // Increment stats @@ -71,7 +71,7 @@ export default class Queue { } async started(job: EncodedJob) { - logger.info(job, 'queue:job:started') + logger.trace(job, 'queue:job:started') } async errored(error: Error, job?: EncodedJob) { @@ -81,7 +81,7 @@ export default class Queue { } async completed(job: EncodedJob) { - logger.info(job, 'queue:job:completed') + logger.trace(job, 'queue:job:completed') } async start() { diff --git a/apps/platform/src/rules/RuleService.ts b/apps/platform/src/rules/RuleService.ts index e6237dde..f4dd4e6a 100644 --- a/apps/platform/src/rules/RuleService.ts +++ b/apps/platform/src/rules/RuleService.ts @@ -1,3 +1,5 @@ +import App from '../app' +import { cacheDel, cacheGet, cacheSet } from '../config/redis' import { ModelParams } from '../core/Model' import Project from '../projects/Project' import { User } from '../users/User' @@ -19,6 +21,10 @@ type RuleWithEvaluationId = Rule & EvaluationId type RuleTreeWithEvaluationId = RuleTree & EvaluationId export type RuleResults = { success: string[], failure: string[] } +const CacheKeys = { + ruleTree: (rootId: number) => `rule_tree:${rootId}`, +} + /** * For a given user and set of rules joined with evaluation results, * check if all rules are true. @@ -196,6 +202,8 @@ export const mergeInsertRules = async (rules: Rule[]) => { : await Rule.insert(rule) } + await cacheDel(App.main.redis, CacheKeys.ruleTree(root.id)) + return newItems } @@ -204,11 +212,17 @@ export const mergeInsertRules = async (rules: Rule[]) => { * into a nested tree structure. */ export const fetchAndCompileRule = async (rootId: number): Promise => { + + const cache = await cacheGet(App.main.redis, CacheKeys.ruleTree(rootId)) + if (cache) return cache + const root = await Rule.find(rootId) if (!root) return undefined const rules = await Rule.all(qb => qb.where('root_uuid', root!.uuid)) - return compileRule(root, rules) + const compiled = compileRule(root, rules) + await cacheSet(App.main.redis, CacheKeys.ruleTree(rootId), compiled, 3600) + return compiled } export const compileRule = (root: Rule, rules: Rule[]): RuleTree => { diff --git a/apps/ui/src/types.ts b/apps/ui/src/types.ts index cff8b54a..c2d43440 100644 --- a/apps/ui/src/types.ts +++ b/apps/ui/src/types.ts @@ -237,6 +237,10 @@ export type List = { rule?: WrapperRule users_count: number tags?: string[] + progress?: { + complete: number + total: number + } is_visible: boolean created_at: string updated_at: string diff --git a/apps/ui/src/views/users/ListDetail.tsx b/apps/ui/src/views/users/ListDetail.tsx index 029f421d..cb52f663 100644 --- a/apps/ui/src/views/users/ListDetail.tsx +++ b/apps/ui/src/views/users/ListDetail.tsx @@ -69,7 +69,7 @@ export default function ListDetail() { title={list.name} desc={ , + [t('state')]: , [t('type')]: snakeToTitle(list.type), [t('users_count')]: list.state === 'loading' ? <>– diff --git a/apps/ui/src/views/users/ListTable.tsx b/apps/ui/src/views/users/ListTable.tsx index 8c3201ca..509d1be8 100644 --- a/apps/ui/src/views/users/ListTable.tsx +++ b/apps/ui/src/views/users/ListTable.tsx @@ -17,15 +17,21 @@ interface ListTableParams { onSelectRow?: (list: List) => void } -export const ListTag = ({ state }: { state: ListState }) => { +export const ListTag = ({ state, progress }: Pick) => { const variant: Record = { draft: 'plain', loading: 'info', ready: 'success', } + const complete = progress?.complete ?? 0 + const total = progress?.total ?? 0 + const percent = total > 0 ? complete / total : 0 + const percentStr = percent.toLocaleString(undefined, { style: 'percent', minimumFractionDigits: 0 }) + return { (t) => t(state) } + {progress && ` (${percentStr})`} }