Skip to content

Commit

Permalink
Improve campaign send performance (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Nov 9, 2024
1 parent 81a6828 commit 9a4f6da
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 47 deletions.
13 changes: 12 additions & 1 deletion apps/platform/src/campaigns/CampaignEnqueueSendsJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { CampaignJobParams } from './Campaign'
import { chunk } from '../utilities'
import App from '../app'
import { acquireLock, releaseLock } from '../core/Lock'
import { getProvider } from '../providers/ProviderRepository'

export default class CampaignEnqueueSendsJob extends Job {
static $name = 'campaign_enqueue_sends_job'
Expand All @@ -20,8 +21,18 @@ export default class CampaignEnqueueSendsJob extends Job {
const acquired = await acquireLock({ key, timeout: 300 })
if (!acquired) return

// If we are using redis, we can include throttled sends
// because they are deduped based on jobId. Not available in other
// queues
const includeThrottled = App.main.env.queue.driver === 'redis'

// Only enqueue the maximum that can be sent for the interval
// this job runs (every minute)
const provider = await getProvider(campaign.provider_id)
const ratePerMinute = provider?.ratePer('minute')

// Anything that is ready to be sent, enqueue for sending
const query = campaignSendReadyQuery(campaign.id)
const query = campaignSendReadyQuery(campaign.id, includeThrottled, ratePerMinute)
await chunk<{ user_id: number, send_id: number }>(query, 100, async (items) => {
const jobs = items.map(({ user_id, send_id }) => sendCampaignJob({ campaign, user: user_id, send_id }))
await App.main.queue.enqueueBatch(jobs)
Expand Down
34 changes: 18 additions & 16 deletions apps/platform/src/campaigns/CampaignService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ export const getCampaignUsers = async (id: number, params: PageParams, projectId
)
}

interface SendCampaign {
campaign: Campaign
user: User | number
event?: UserEvent | number
send_id?: number
reference_type?: CampaignSendReferenceType
reference_id?: string
}

export const triggerCampaignSend = async ({ campaign, user, event, send_id, reference_type, reference_id }: SendCampaign) => {
const userId = user instanceof User ? user.id : user
const eventId = event instanceof UserEvent ? event?.id : event
Expand Down Expand Up @@ -208,15 +217,6 @@ export const triggerCampaignSend = async ({ campaign, user, event, send_id, refe
})
}

interface SendCampaign {
campaign: Campaign
user: User | number
event?: UserEvent | number
send_id?: number
reference_type?: CampaignSendReferenceType
reference_id?: string
}

export const sendCampaignJob = ({ campaign, user, event, send_id, reference_type, reference_id }: SendCampaign): EmailJob | TextJob | PushJob | WebhookJob => {

// TODO: Might also need to check for unsubscribe in here since we can
Expand Down Expand Up @@ -244,10 +244,6 @@ export const sendCampaignJob = ({ campaign, user, event, send_id, reference_type
return job
}

export const sendCampaign = async (data: SendCampaign): Promise<void> => {
await sendCampaignJob(data).queue()
}

interface UpdateSendStateParams {
campaign: Campaign | number
user: User | number
Expand Down Expand Up @@ -305,12 +301,18 @@ export const generateSendList = async (campaign: SentCampaign) => {
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
}

export const campaignSendReadyQuery = (campaignId: number) => {
return CampaignSend.query()
export const campaignSendReadyQuery = (
campaignId: number,
includeThrottled = false,
limit?: number,
) => {
const query = CampaignSend.query()
.where('campaign_sends.send_at', '<=', CampaignSend.raw('NOW()'))
.where('campaign_sends.state', 'pending')
.where('campaign_sends.state', includeThrottled ? ['pending', 'throttled'] : ['pending'])
.where('campaign_id', campaignId)
.select('user_id', 'campaign_sends.id AS send_id')
if (limit) query.limit(limit)
return query
}

export const checkStalledSends = (campaignId: number) => {
Expand Down
18 changes: 1 addition & 17 deletions apps/platform/src/campaigns/__tests__/CampaignService.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import App from '../../app'
import EmailJob from '../../providers/email/EmailJob'
import { RequestError } from '../../core/errors'
import { addUserToList, createList } from '../../lists/ListService'
import { createSubscription, subscribe, subscribeAll } from '../../subscriptions/SubscriptionService'
import { User } from '../../users/User'
import { uuid } from '../../utilities'
import Campaign, { CampaignSend, SentCampaign } from '../Campaign'
import { allCampaigns, createCampaign, getCampaign, sendCampaign, generateSendList, estimatedSendSize, updateCampaignSendEnrollment } from '../CampaignService'
import { allCampaigns, createCampaign, getCampaign, generateSendList, estimatedSendSize, updateCampaignSendEnrollment } from '../CampaignService'
import { createProvider } from '../../providers/ProviderRepository'
import { createTestProject } from '../../projects/__tests__/ProjectTestHelpers'
import ListStatsJob from '../../lists/ListStatsJob'
Expand Down Expand Up @@ -155,20 +153,6 @@ describe('CampaignService', () => {
})
})

describe('sendCampaign', () => {
test('enqueue an email job', async () => {

const campaign = await createTestCampaign()
const user = await createUser(campaign.project_id)

const spy = jest.spyOn(App.main.queue, 'enqueue')
await sendCampaign({ campaign, user })

expect(spy).toHaveBeenCalledTimes(1)
expect(spy.mock.calls[0][0]).toBeInstanceOf(EmailJob)
})
})

describe('sendList', () => {
test('enqueue sends for a list of people', async () => {
const params = await createCampaignDependencies()
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/providers/MessageTriggerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export const throttleSend = async (channel: Channel, points = 1): Promise<RateLi
{
limit: provider.rate_limit,
points,
msDuration: provider.interval(),
msDuration: provider.interval,
},
)
}
Expand Down
18 changes: 15 additions & 3 deletions apps/platform/src/providers/Provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ export function ProviderSchema<_ extends ExternalProviderParams, D>(id: string,
} as any
}

type RateInterval = 'second' | 'minute' | 'hour' | 'day'

export default class Provider extends Model {
type!: string
name!: string
Expand All @@ -61,7 +63,7 @@ export default class Provider extends Model {
data!: Record<string, any>
is_default!: boolean
rate_limit!: number
rate_interval!: 'second' | 'minute' | 'hour' | 'day'
rate_interval!: RateInterval
setup!: ProviderSetupMeta[]

static jsonAttributes = ['data']
Expand Down Expand Up @@ -109,7 +111,7 @@ export default class Provider extends Model {

static get options(): ProviderOptions { return {} }

interval() {
get interval() {
const intervals = {
second: 1000,
minute: 60 * 1000,
Expand All @@ -118,11 +120,21 @@ export default class Provider extends Model {
}
return intervals[this.rate_interval || 'second']
}

ratePer(period: RateInterval) {
const intervals = {
second: 1,
minute: 60,
hour: 60 * 60,
day: 24 * 60 * 60,
}
return this.rate_limit * intervals[period]
}
}

export type ProviderMap<T extends Provider> = (record: any) => T

export type ProviderParams = Omit<Provider, ModelParams | 'setup' | 'loadSetup' | 'interval'>
export type ProviderParams = Omit<Provider, ModelParams | 'setup' | 'loadSetup' | 'interval' | 'ratePer'>

export type ExternalProviderParams = Omit<ProviderParams, 'group'>

Expand Down
28 changes: 19 additions & 9 deletions apps/platform/src/queue/MemoryQueueProvider.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { sleep } from '../utilities'
import { sleep, uuid } from '../utilities'
import Job from './Job'
import Queue, { QueueTypeConfig } from './Queue'
import QueueProvider from './QueueProvider'
Expand All @@ -9,7 +9,8 @@ export interface MemoryConfig extends QueueTypeConfig {

export default class MemoryQueueProvider implements QueueProvider {
queue: Queue
backlog: Job[] = []
jobs: Record<string, Job> = {}
backlog: string[] = []
loop: NodeJS.Timeout | undefined
batchSize = 1000 as const

Expand All @@ -18,11 +19,15 @@ export default class MemoryQueueProvider implements QueueProvider {
}

async enqueue(job: Job): Promise<void> {
this.backlog.push(job)
const jobId = job.options.jobId ?? uuid()
if (!this.jobs[jobId]) {
this.jobs[jobId] = job
this.backlog.push(jobId)
}
}

async enqueueBatch(jobs: Job[]): Promise<void> {
this.backlog.push(...jobs)
for (const job of jobs) this.enqueue(job)
}

async delay(job: Job, milliseconds: number): Promise<void> {
Expand All @@ -42,12 +47,17 @@ export default class MemoryQueueProvider implements QueueProvider {
}

private async process() {
let job = this.backlog.shift()
while (job) {
if (job) {
await this.queue.dequeue(job)
let jobId = this.backlog.shift()
while (jobId) {

// If we have a jobId fetch job and dequeue
if (jobId) {
const job = this.jobs[jobId]
if (job) await this.queue.dequeue(job)
delete this.jobs[jobId]
}
job = this.backlog.shift()

jobId = this.backlog.shift()
}
await sleep(1000)
await this.process()
Expand Down

0 comments on commit 9a4f6da

Please sign in to comment.