Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows for distributed list generation #566

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/platform/src/config/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import UpdateJourneysJob from '../journey/UpdateJourneysJob'
import ScheduledEntranceJob from '../journey/ScheduledEntranceJob'
import ScheduledEntranceOrchestratorJob from '../journey/ScheduledEntranceOrchestratorJob'
import ListRefreshJob from '../lists/ListRefreshJob'
import ListEvaluateUserJob from '../lists/ListEvaluateUserJob'

export const jobs = [
CampaignGenerateListJob,
Expand All @@ -38,6 +39,7 @@ export const jobs = [
JourneyDelayJob,
JourneyProcessJob,
JourneyStatsJob,
ListEvaluateUserJob,
ListRefreshJob,
ListPopulateJob,
ListStatsJob,
Expand Down
6 changes: 4 additions & 2 deletions apps/platform/src/config/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@ export const cacheDel = async (redis: Redis, key: string) => {
}

export const cacheIncr = async (redis: Redis, key: string, incr = 1, ttl?: number) => {
await redis.incrby(key, incr)
const val = await redis.incrby(key, incr)
if (ttl) {
await redis.expire(key, ttl)
}
return val
}

export const cacheDecr = async (redis: Redis, key: string, ttl?: number) => {
await redis.decr(key)
const val = await redis.decr(key)
if (ttl) {
await redis.expire(key, ttl)
}
return val
}

export { Redis }
8 changes: 7 additions & 1 deletion apps/platform/src/lists/List.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ export default class List extends Model {
is_visible!: boolean
refreshed_at?: Date | null
deleted_at?: Date
progress?: ListProgress
}

export type DynamicList = List & { rule: RuleTree }
export type ListProgress = {
complete: number
total: number
}

export type DynamicList = List & { rule_id: number, rule: RuleTree }

export class UserList extends Model {
user_id!: number
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/lists/ListController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ router.use(projectRoleMiddleware('editor'))

router.get('/', async ctx => {
const searchSchema = SearchSchema('listUserSearchSchema', {
sort: 'id',
sort: 'updated_at',
direction: 'desc',
})
const params = extractQueryParams(ctx.query, searchSchema)
Expand Down
49 changes: 49 additions & 0 deletions apps/platform/src/lists/ListEvaluateUserJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import App from '../app'
import { cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { getUser } from '../users/UserRepository'
import { DynamicList } from './List'
import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService'

interface ListEvaluateUserParams {
listId: number
userId: number
projectId: number
version: number
totalCount: number
}

export default class ListEvaluateUserJob extends Job {
static $name = 'list_evaluate_user_job'

static from(params: ListEvaluateUserParams): ListEvaluateUserJob {
return new this(params)
}

static async handler({ listId, userId, projectId, version, totalCount }: ListEvaluateUserParams) {

const list = await getList(listId, projectId) as DynamicList
if (!list) return

// Check to see if we are still evaluating the latest
// version of the list ruleset
if (list.version !== version) return

const evaluate = async () => {
const user = await getUser(userId, projectId)
if (!user) return
await evaluateUserList(user, list)
}

// No matter what always increment the progress
try {
await evaluate()
} finally {
const cacheKey = CacheKeys.populationProgress(list)
const count = await cacheIncr(App.main.redis, cacheKey, 1, 300)
if (count >= totalCount) {
await cleanupList(list)
}
}
}
}
3 changes: 0 additions & 3 deletions apps/platform/src/lists/ListPopulateJob.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Job } from '../queue'
import { DynamicList } from './List'
import { getList, populateList } from './ListService'
import ListStatsJob from './ListStatsJob'

interface ListPopulateParams {
listId: number
Expand All @@ -21,7 +20,5 @@ export default class ListPopulateJob extends Job {
if (!list) return

await populateList(list)

await ListStatsJob.from(listId, projectId, true).queue()
}
}
139 changes: 97 additions & 42 deletions apps/platform/src/lists/ListService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { UserEvent } from '../users/UserEvent'
import { User } from '../users/User'
import { check } from '../rules/RuleEngine'
import List, { DynamicList, ListCreateParams, ListUpdateParams, UserList } from './List'
import List, { DynamicList, ListCreateParams, ListProgress, ListUpdateParams, UserList } from './List'
import Rule, { RuleEvaluation, RuleTree } from '../rules/Rule'
import { PageParams } from '../core/searchParams'
import ListPopulateJob from './ListPopulateJob'
Expand All @@ -12,10 +12,18 @@ import { Chunker } from '../utilities'
import { getUserEventsForRules } from '../users/UserRepository'
import { DateRuleTypes, RuleResults, RuleWithEvaluationResult, checkRules, decompileRule, fetchAndCompileRule, getDateRuleType, mergeInsertRules, splitRuleTree } from '../rules/RuleService'
import { updateCampaignSendEnrollment } from '../campaigns/CampaignService'
import { cacheDecr, cacheIncr } from '../config/redis'
import { cacheDecr, cacheDel, cacheGet, cacheIncr, cacheSet } from '../config/redis'
import App from '../app'
import { RequestError } from '../core/errors'
import RuleError from '../rules/RuleError'
import ListEvaluateUserJob from './ListEvaluateUserJob'
import ListStatsJob from './ListStatsJob'

export const CacheKeys = {
memberCount: (list: List) => `list:${list.id}:${list.version}:count`,
populationProgress: (list: List) => `list:${list.id}:${list.version}:progress`,
populationTotal: (list: Pick<List, 'id' | 'version'>) => `list:${list.id}:${list.version}:total`,
}

export const pagedLists = async (params: PageParams, projectId: number) => {
const result = await List.search(
Expand Down Expand Up @@ -61,6 +69,9 @@ export const getList = async (id: number, projectId: number) => {
if (list) {
list.tags = await getTags(List.tableName, [list.id]).then(m => m.get(list.id))
if (list.rule_id) list.rule = await fetchAndCompileRule(list.rule_id)
if (list.state === 'loading') {
list.progress = await populationProgress(list)
}
}
return list
}
Expand Down Expand Up @@ -170,7 +181,10 @@ export const updateList = async (list: List, { tags, rule, published, ...params
}

// Start repopulation of the list if state is published
if (list.state !== 'draft') await ListPopulateJob.from(list.id, list.project_id).queue()
if (list.state !== 'draft') {
await updateListState(list.id, { state: 'loading' })
await ListPopulateJob.from(list.id, list.project_id).queue()
}
}

return await getList(list.id, list.project_id)
Expand All @@ -185,8 +199,6 @@ export const deleteList = async (id: number, projectId: number) => {
return await List.deleteById(id, qb => qb.where('project_id', projectId))
}

export const countKey = (list: List) => `list:${list.id}:${list.version}:count`

export const addUserToList = async (user: User | number, list: List, event?: UserEvent) => {
const userId = user instanceof User ? user.id : user
const resp = await UserList.query()
Expand All @@ -198,7 +210,7 @@ export const addUserToList = async (user: User | number, list: List, event?: Use
})
.onConflict(['user_id', 'list_id'])
.ignore()
if (resp?.[0]) await cacheIncr(App.main.redis, countKey(list))
if (resp?.[0]) await cacheIncr(App.main.redis, CacheKeys.memberCount(list))
return resp
}

Expand All @@ -208,7 +220,7 @@ export const removeUserFromList = async (user: User | number, list: List) => {
qb.where('user_id', userId)
.where('list_id', list.id),
)
if (count) await cacheDecr(App.main.redis, countKey(list))
if (count) await cacheDecr(App.main.redis, CacheKeys.memberCount(list))
return count
}

Expand Down Expand Up @@ -283,59 +295,102 @@ const scrollUserListForEvaluation = async ({
}
}

export const populateList = async (list: List) => {
const { id, version: oldVersion = 0 } = list
const version = oldVersion + 1
await updateListState(id, { state: 'loading', version })
export const evaluateUserList = async (user: User, list: DynamicList) => {
const rule = await fetchAndCompileRule(list.rule_id) as RuleTree
const { eventRules, userRules } = splitRuleTree(rule)

const scroll = User.scroll(q => q.where('project_id', list.project_id))
const parts: RuleWithEvaluationResult[] = []
const events = await getUserEventsForRules([user.id], eventRules)

// Collect matching user ids, insert in batches of 100
const userChunker = new Chunker<number>(async userIds => {
await UserList.query()
.insert(userIds.map(user_id => ({
list_id: list.id,
user_id,
version,
})))
.onConflict(['user_id', 'list_id'])
.merge(['version'])
}, 100)
for (const rule of eventRules) {
const result = check({
user: user.flatten(),
events: events.map(e => e.flatten()),
}, rule)

// Collect rule evaluations, insert in batches of 100
const ruleChunker = new Chunker<Partial<RuleEvaluation>>(async items => {
await RuleEvaluation.query()
.insert(items.map(({ user_id, rule_id, result }) => ({
user_id,
rule_id,
.insert({
rule_id: rule.id!,
user_id: user.id,
result,
})))
})
.onConflict(['user_id', 'rule_id'])
.merge(['result'])

parts.push({
...rule,
result,
})
}

const result = checkRules(user, rule, [...parts, ...userRules])

if (result) {
await UserList.query()
.insert({
list_id: list.id,
user_id: user.id,
version: list.version,
})
.onConflict(['user_id', 'list_id'])
.merge(['version'])
}
}

export const populateList = async (list: List) => {
const { id, version: oldVersion = 0 } = list
const version = oldVersion + 1
list = await updateListState(id, { state: 'loading', version })

// Set the total in cache so we can start to calculate progress
const count = await User.count(q => q.where('project_id', list.project_id))
await cacheSet<number>(App.main.redis, CacheKeys.populationTotal(list), count, 86400)

const stream = User.query()
.where('project_id', list.project_id)
.stream()

// Enqueue batches of 100 jobs at a time for efficiency
const userChunker = new Chunker<ListEvaluateUserJob>(async jobs => {
await App.main.queue.enqueueBatch(jobs)
}, 100)

await scrollUserListForEvaluation({
list,
scroll,
handleRule: async ({ rule_id, user_id, result }) => {
await ruleChunker.add({ rule_id, user_id, result })
},
handleEntry: async (user, result) => {
if (result) await userChunker.add(user.id)
},
})
for await (const user of stream) {
await userChunker.add(
ListEvaluateUserJob.from({
listId: list.id,
userId: user.id,
projectId: list.project_id,
version,
totalCount: count,
}),
)
}

// Insert any remaining chunks
await ruleChunker.flush()
await userChunker.flush()
}

export const populationProgress = async (list: List): Promise<ListProgress> => {
return {
complete: await cacheGet<number>(App.main.redis, CacheKeys.populationProgress(list)) ?? 0,
total: await cacheGet<number>(App.main.redis, CacheKeys.populationTotal(list)) ?? 0,
}
}

export const cleanupList = async ({ id, project_id, version }: List) => {

// Once list is regenerated, drop any users from previous version
await UserList.delete(qb => qb
.where('version', '<', version)
.where('list_id', list.id))
.where('list_id', id))

// Update list status to ready
await updateListState(id, { state: 'ready' })

// Clear cache values
await cacheDel(App.main.redis, CacheKeys.populationTotal({ id, version }))

await ListStatsJob.from(id, project_id, true).queue()
}

export const refreshList = async (list: List, types: DateRuleTypes) => {
Expand Down
9 changes: 6 additions & 3 deletions apps/platform/src/lists/ListStatsJob.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import App from '../app'
import { cacheDel, cacheGet, cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { countKey, getList, listUserCount, updateListState } from './ListService'
import List from './List'
import { CacheKeys, getList, listUserCount } from './ListService'

interface ListStatsParams {
listId: number
Expand All @@ -26,7 +27,7 @@ export default class ListStatsJob extends Job {
if (!list) return

const redis = App.main.redis
const cacheKey = countKey(list)
const cacheKey = CacheKeys.memberCount(list)

let count = await cacheGet<number>(redis, cacheKey) ?? 0
if (!list?.users_count || reset) {
Expand All @@ -36,6 +37,8 @@ export default class ListStatsJob extends Job {
}

// Update the list with the new totals
await updateListState(list.id, { users_count: count })
await List.query()
.update({ users_count: count })
.where('id', listId)
}
}
Loading
Loading