Skip to content

Commit

Permalink
Allows for distributed list generation (#566)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Dec 5, 2024
1 parent f05f099 commit 3867511
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 64 deletions.
2 changes: 2 additions & 0 deletions apps/platform/src/config/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import UpdateJourneysJob from '../journey/UpdateJourneysJob'
import ScheduledEntranceJob from '../journey/ScheduledEntranceJob'
import ScheduledEntranceOrchestratorJob from '../journey/ScheduledEntranceOrchestratorJob'
import ListRefreshJob from '../lists/ListRefreshJob'
import ListEvaluateUserJob from '../lists/ListEvaluateUserJob'

export const jobs = [
CampaignGenerateListJob,
Expand All @@ -38,6 +39,7 @@ export const jobs = [
JourneyDelayJob,
JourneyProcessJob,
JourneyStatsJob,
ListEvaluateUserJob,
ListRefreshJob,
ListPopulateJob,
ListStatsJob,
Expand Down
6 changes: 4 additions & 2 deletions apps/platform/src/config/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@ export const cacheDel = async (redis: Redis, key: string) => {
}

export const cacheIncr = async (redis: Redis, key: string, incr = 1, ttl?: number) => {
await redis.incrby(key, incr)
const val = await redis.incrby(key, incr)
if (ttl) {
await redis.expire(key, ttl)
}
return val
}

export const cacheDecr = async (redis: Redis, key: string, ttl?: number) => {
await redis.decr(key)
const val = await redis.decr(key)
if (ttl) {
await redis.expire(key, ttl)
}
return val
}

export { Redis }
8 changes: 7 additions & 1 deletion apps/platform/src/lists/List.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ export default class List extends Model {
is_visible!: boolean
refreshed_at?: Date | null
deleted_at?: Date
progress?: ListProgress
}

export type DynamicList = List & { rule: RuleTree }
export type ListProgress = {
complete: number
total: number
}

export type DynamicList = List & { rule_id: number, rule: RuleTree }

export class UserList extends Model {
user_id!: number
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/lists/ListController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ router.use(projectRoleMiddleware('editor'))

router.get('/', async ctx => {
const searchSchema = SearchSchema('listUserSearchSchema', {
sort: 'id',
sort: 'updated_at',
direction: 'desc',
})
const params = extractQueryParams(ctx.query, searchSchema)
Expand Down
49 changes: 49 additions & 0 deletions apps/platform/src/lists/ListEvaluateUserJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import App from '../app'
import { cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { getUser } from '../users/UserRepository'
import { DynamicList } from './List'
import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService'

interface ListEvaluateUserParams {
listId: number
userId: number
projectId: number
version: number
totalCount: number
}

export default class ListEvaluateUserJob extends Job {
static $name = 'list_evaluate_user_job'

static from(params: ListEvaluateUserParams): ListEvaluateUserJob {
return new this(params)
}

static async handler({ listId, userId, projectId, version, totalCount }: ListEvaluateUserParams) {

const list = await getList(listId, projectId) as DynamicList
if (!list) return

// Check to see if we are still evaluating the latest
// version of the list ruleset
if (list.version !== version) return

const evaluate = async () => {
const user = await getUser(userId, projectId)
if (!user) return
await evaluateUserList(user, list)
}

// No matter what always increment the progress
try {
await evaluate()
} finally {
const cacheKey = CacheKeys.populationProgress(list)
const count = await cacheIncr(App.main.redis, cacheKey, 1, 300)
if (count >= totalCount) {
await cleanupList(list)
}
}
}
}
3 changes: 0 additions & 3 deletions apps/platform/src/lists/ListPopulateJob.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Job } from '../queue'
import { DynamicList } from './List'
import { getList, populateList } from './ListService'
import ListStatsJob from './ListStatsJob'

interface ListPopulateParams {
listId: number
Expand All @@ -21,7 +20,5 @@ export default class ListPopulateJob extends Job {
if (!list) return

await populateList(list)

await ListStatsJob.from(listId, projectId, true).queue()
}
}
139 changes: 97 additions & 42 deletions apps/platform/src/lists/ListService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { UserEvent } from '../users/UserEvent'
import { User } from '../users/User'
import { check } from '../rules/RuleEngine'
import List, { DynamicList, ListCreateParams, ListUpdateParams, UserList } from './List'
import List, { DynamicList, ListCreateParams, ListProgress, ListUpdateParams, UserList } from './List'
import Rule, { RuleEvaluation, RuleTree } from '../rules/Rule'
import { PageParams } from '../core/searchParams'
import ListPopulateJob from './ListPopulateJob'
Expand All @@ -12,10 +12,18 @@ import { Chunker } from '../utilities'
import { getUserEventsForRules } from '../users/UserRepository'
import { DateRuleTypes, RuleResults, RuleWithEvaluationResult, checkRules, decompileRule, fetchAndCompileRule, getDateRuleType, mergeInsertRules, splitRuleTree } from '../rules/RuleService'
import { updateCampaignSendEnrollment } from '../campaigns/CampaignService'
import { cacheDecr, cacheIncr } from '../config/redis'
import { cacheDecr, cacheDel, cacheGet, cacheIncr, cacheSet } from '../config/redis'
import App from '../app'
import { RequestError } from '../core/errors'
import RuleError from '../rules/RuleError'
import ListEvaluateUserJob from './ListEvaluateUserJob'
import ListStatsJob from './ListStatsJob'

export const CacheKeys = {
memberCount: (list: List) => `list:${list.id}:${list.version}:count`,
populationProgress: (list: List) => `list:${list.id}:${list.version}:progress`,
populationTotal: (list: Pick<List, 'id' | 'version'>) => `list:${list.id}:${list.version}:total`,
}

export const pagedLists = async (params: PageParams, projectId: number) => {
const result = await List.search(
Expand Down Expand Up @@ -61,6 +69,9 @@ export const getList = async (id: number, projectId: number) => {
if (list) {
list.tags = await getTags(List.tableName, [list.id]).then(m => m.get(list.id))
if (list.rule_id) list.rule = await fetchAndCompileRule(list.rule_id)
if (list.state === 'loading') {
list.progress = await populationProgress(list)
}
}
return list
}
Expand Down Expand Up @@ -170,7 +181,10 @@ export const updateList = async (list: List, { tags, rule, published, ...params
}

// Start repopulation of the list if state is published
if (list.state !== 'draft') await ListPopulateJob.from(list.id, list.project_id).queue()
if (list.state !== 'draft') {
await updateListState(list.id, { state: 'loading' })
await ListPopulateJob.from(list.id, list.project_id).queue()
}
}

return await getList(list.id, list.project_id)
Expand All @@ -185,8 +199,6 @@ export const deleteList = async (id: number, projectId: number) => {
return await List.deleteById(id, qb => qb.where('project_id', projectId))
}

export const countKey = (list: List) => `list:${list.id}:${list.version}:count`

export const addUserToList = async (user: User | number, list: List, event?: UserEvent) => {
const userId = user instanceof User ? user.id : user
const resp = await UserList.query()
Expand All @@ -198,7 +210,7 @@ export const addUserToList = async (user: User | number, list: List, event?: Use
})
.onConflict(['user_id', 'list_id'])
.ignore()
if (resp?.[0]) await cacheIncr(App.main.redis, countKey(list))
if (resp?.[0]) await cacheIncr(App.main.redis, CacheKeys.memberCount(list))
return resp
}

Expand All @@ -208,7 +220,7 @@ export const removeUserFromList = async (user: User | number, list: List) => {
qb.where('user_id', userId)
.where('list_id', list.id),
)
if (count) await cacheDecr(App.main.redis, countKey(list))
if (count) await cacheDecr(App.main.redis, CacheKeys.memberCount(list))
return count
}

Expand Down Expand Up @@ -283,59 +295,102 @@ const scrollUserListForEvaluation = async ({
}
}

export const populateList = async (list: List) => {
const { id, version: oldVersion = 0 } = list
const version = oldVersion + 1
await updateListState(id, { state: 'loading', version })
export const evaluateUserList = async (user: User, list: DynamicList) => {
const rule = await fetchAndCompileRule(list.rule_id) as RuleTree
const { eventRules, userRules } = splitRuleTree(rule)

const scroll = User.scroll(q => q.where('project_id', list.project_id))
const parts: RuleWithEvaluationResult[] = []
const events = await getUserEventsForRules([user.id], eventRules)

// Collect matching user ids, insert in batches of 100
const userChunker = new Chunker<number>(async userIds => {
await UserList.query()
.insert(userIds.map(user_id => ({
list_id: list.id,
user_id,
version,
})))
.onConflict(['user_id', 'list_id'])
.merge(['version'])
}, 100)
for (const rule of eventRules) {
const result = check({
user: user.flatten(),
events: events.map(e => e.flatten()),
}, rule)

// Collect rule evaluations, insert in batches of 100
const ruleChunker = new Chunker<Partial<RuleEvaluation>>(async items => {
await RuleEvaluation.query()
.insert(items.map(({ user_id, rule_id, result }) => ({
user_id,
rule_id,
.insert({
rule_id: rule.id!,
user_id: user.id,
result,
})))
})
.onConflict(['user_id', 'rule_id'])
.merge(['result'])

parts.push({
...rule,
result,
})
}

const result = checkRules(user, rule, [...parts, ...userRules])

if (result) {
await UserList.query()
.insert({
list_id: list.id,
user_id: user.id,
version: list.version,
})
.onConflict(['user_id', 'list_id'])
.merge(['version'])
}
}

export const populateList = async (list: List) => {
const { id, version: oldVersion = 0 } = list
const version = oldVersion + 1
list = await updateListState(id, { state: 'loading', version })

// Set the total in cache so we can start to calculate progress
const count = await User.count(q => q.where('project_id', list.project_id))
await cacheSet<number>(App.main.redis, CacheKeys.populationTotal(list), count, 86400)

const stream = User.query()
.where('project_id', list.project_id)
.stream()

// Enqueue batches of 100 jobs at a time for efficiency
const userChunker = new Chunker<ListEvaluateUserJob>(async jobs => {
await App.main.queue.enqueueBatch(jobs)
}, 100)

await scrollUserListForEvaluation({
list,
scroll,
handleRule: async ({ rule_id, user_id, result }) => {
await ruleChunker.add({ rule_id, user_id, result })
},
handleEntry: async (user, result) => {
if (result) await userChunker.add(user.id)
},
})
for await (const user of stream) {
await userChunker.add(
ListEvaluateUserJob.from({
listId: list.id,
userId: user.id,
projectId: list.project_id,
version,
totalCount: count,
}),
)
}

// Insert any remaining chunks
await ruleChunker.flush()
await userChunker.flush()
}

export const populationProgress = async (list: List): Promise<ListProgress> => {
return {
complete: await cacheGet<number>(App.main.redis, CacheKeys.populationProgress(list)) ?? 0,
total: await cacheGet<number>(App.main.redis, CacheKeys.populationTotal(list)) ?? 0,
}
}

export const cleanupList = async ({ id, project_id, version }: List) => {

// Once list is regenerated, drop any users from previous version
await UserList.delete(qb => qb
.where('version', '<', version)
.where('list_id', list.id))
.where('list_id', id))

// Update list status to ready
await updateListState(id, { state: 'ready' })

// Clear cache values
await cacheDel(App.main.redis, CacheKeys.populationTotal({ id, version }))

await ListStatsJob.from(id, project_id, true).queue()
}

export const refreshList = async (list: List, types: DateRuleTypes) => {
Expand Down
9 changes: 6 additions & 3 deletions apps/platform/src/lists/ListStatsJob.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import App from '../app'
import { cacheDel, cacheGet, cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { countKey, getList, listUserCount, updateListState } from './ListService'
import List from './List'
import { CacheKeys, getList, listUserCount } from './ListService'

interface ListStatsParams {
listId: number
Expand All @@ -26,7 +27,7 @@ export default class ListStatsJob extends Job {
if (!list) return

const redis = App.main.redis
const cacheKey = countKey(list)
const cacheKey = CacheKeys.memberCount(list)

let count = await cacheGet<number>(redis, cacheKey) ?? 0
if (!list?.users_count || reset) {
Expand All @@ -36,6 +37,8 @@ export default class ListStatsJob extends Job {
}

// Update the list with the new totals
await updateListState(list.id, { users_count: count })
await List.query()
.update({ users_count: count })
.where('id', listId)
}
}
Loading

0 comments on commit 3867511

Please sign in to comment.