Skip to content

Commit

Permalink
Adds Redis rate limiter to ensure provider send rate is upheld (#131)
Browse files Browse the repository at this point in the history
* Adds Redis rate limiter to ensure provider send rate is upheld

* Tweaks to throttling methods

* Fixes to rate limiting

Improves how rate limiting works by adding a throttled state
  • Loading branch information
pushchris authored Apr 18, 2023
1 parent 13c0cfd commit 4a70551
Show file tree
Hide file tree
Showing 26 changed files with 285 additions and 69 deletions.
6 changes: 3 additions & 3 deletions apps/platform/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# --------------> The compiler image
FROM node:16 AS compile
FROM node:18 AS compile
WORKDIR /usr/src/app/apps/platform
COPY ./tsconfig.base.json /usr/src/app
COPY ./apps/platform ./
RUN npm ci
RUN npm run build

# --------------> The build image
FROM node:16 AS build
FROM node:18 AS build
WORKDIR /usr/src/app
COPY --from=compile /usr/src/app/apps/platform/package*.json ./
COPY --from=compile /usr/src/app/apps/platform/build ./
Expand All @@ -17,7 +17,7 @@ COPY --from=compile /usr/src/app/apps/platform/public ./public
RUN npm ci --only=production

# --------------> The production image
FROM node:16-alpine
FROM node:18-alpine
RUN apk add dumb-init
ENV NODE_ENV="production"
USER node
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
exports.up = async function(knex) {
await knex.schema.table('providers', function(table) {
table.integer('rate_limit').nullable().after('is_default')
})
}

exports.down = async function(knex) {
await knex.schema.table('providers', function(table) {
table.dropColumn('rate_limit')
})
}
19 changes: 19 additions & 0 deletions apps/platform/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import loadQueue from './config/queue'
import loadStorage from './config/storage'
import loadAuth from './config/auth'
import loadError, { logger } from './config/logger'
import loadRateLimit, { RateLimiter } from './config/rateLimit'
import type { Env } from './config/env'
import type Queue from './queue'
import Storage from './storage'
Expand Down Expand Up @@ -55,6 +56,7 @@ export default class App {
uuid = uuid()
api?: Api
worker?: Worker
rateLimiter: RateLimiter
#registered: { [key: string | number]: unknown }

// eslint-disable-next-line no-useless-constructor
Expand All @@ -67,6 +69,8 @@ export default class App {
public error: ErrorHandler,
) {
this.#registered = {}
this.rateLimiter = loadRateLimit(env.redis)
this.unhandledErrorListener()
}

async start() {
Expand All @@ -89,6 +93,7 @@ export default class App {
await this.worker?.close()
await this.db.destroy()
await this.queue.close()
await this.rateLimiter.close()
}

get<T>(key: number | string): T {
Expand All @@ -102,4 +107,18 @@ export default class App {
remove(key: number | string) {
delete this.#registered[key]
}

unhandledErrorListener() {
['exit', 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'SIGTERM'].forEach((eventType) => {
process.on(eventType, async () => {
await this.close()
process.exit()
})
})
process.on('uncaughtException', async (error) => {
logger.error(error, 'uncaught error')
await this.close()
process.exit()
})
}
}
2 changes: 1 addition & 1 deletion apps/platform/src/campaigns/Campaign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export type CampaignParams = Omit<Campaign, ModelParams | 'delivery' | 'screensh
export type CampaignCreateParams = Omit<CampaignParams, 'state'>
export type CampaignUpdateParams = Omit<CampaignParams, 'channel'>

export type CampaignSendState = 'pending' | 'sent' | 'failed' | 'bounced' | 'aborted'
export type CampaignSendState = 'pending' | 'sent' | 'throttled' | 'failed' | 'bounced' | 'aborted'
export class CampaignSend extends Model {
campaign_id!: number
user_id!: number
Expand Down
28 changes: 19 additions & 9 deletions apps/platform/src/campaigns/CampaignService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,24 @@ export const updateSendState = async (campaign: Campaign | number, user: User |
const userId = user instanceof User ? user.id : user
const campaignId = campaign instanceof Campaign ? campaign.id : campaign

return await CampaignSend.query()
.insert({
user_id: userId,
campaign_id: campaignId,
state,
})
.onConflict(['user_id', 'list_id'])
.merge(['state'])
// Update send state
const records = await CampaignSend.update(
qb => qb.where('user_id', userId)
.where('campaign_id', campaignId),
{ state },
)

// If no records were updated then try and create missing record
if (records <= 0) {
await CampaignSend.query()
.insert({
user_id: userId,
campaign_id: campaignId,
state,
})
.onConflict(['user_id', 'list_id'])
.merge(['state'])
}
}

export const generateSendList = async (campaign: SentCampaign) => {
Expand Down Expand Up @@ -345,7 +355,7 @@ const totalUsersCount = async (listIds: number[], exclusionListIds: number[]): P
export const campaignProgress = async (campaign: Campaign): Promise<CampaignProgress> => {
const progress = await CampaignSend.query()
.where('campaign_id', campaign.id)
.select(CampaignSend.raw("SUM(IF(state = 'sent', 1, 0)) AS sent, SUM(IF(state = 'pending', 1, 0)) AS pending, COUNT(*) AS total, SUM(IF(opened_at IS NOT NULL, 1, 0)) AS opens, SUM(IF(clicks > 0, 1, 0)) AS clicks"))
.select(CampaignSend.raw("SUM(IF(state = 'sent', 1, 0)) AS sent, SUM(IF(state IN('pending', 'throttled'), 1, 0)) AS pending, COUNT(*) AS total, SUM(IF(opened_at IS NOT NULL, 1, 0)) AS opens, SUM(IF(clicks > 0, 1, 0)) AS clicks"))
.first()
return {
sent: parseInt(progress.sent ?? 0),
Expand Down
5 changes: 3 additions & 2 deletions apps/platform/src/campaigns/CampaignStateJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ export default class CampaignStateJob extends Job {

static async handler() {
const campaigns = await Campaign.query()
.whereIn('state', ['running', 'finished'])
.whereIn('state', ['scheduled', 'running', 'finished'])
for (const campaign of campaigns) {
const { sent, pending, total, opens, clicks } = await campaignProgress(campaign)
await updateCampaignProgress(campaign.id, campaign.project_id, pending <= 0 ? 'finished' : campaign.state, { sent, total, opens, clicks })
const state = pending <= 0 ? 'finished' : sent === 0 ? 'scheduled' : 'running'
await updateCampaignProgress(campaign.id, campaign.project_id, state, { sent, total, opens, clicks })
}
}
}
7 changes: 7 additions & 0 deletions apps/platform/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { QueueConfig } from '../queue/Queue'
import type { DatabaseConfig } from './database'
import type { AuthConfig } from '../auth/Auth'
import type { ErrorConfig } from '../error/ErrorHandler'
import { RedisConfig } from './redis'

export type Runner = 'api' | 'worker'
export interface Env {
Expand All @@ -16,6 +17,7 @@ export interface Env {
secret: string
auth: AuthConfig
error: ErrorConfig
redis: RedisConfig
tracking: {
linkWrap: boolean,
deeplinkMirrorUrl: string | undefined,
Expand Down Expand Up @@ -52,6 +54,11 @@ export default (type?: EnvType): Env => {
database: process.env.DB_DATABASE!,
},
},
redis: {
host: process.env.REDIS_HOST!,
port: parseInt(process.env.REDIS_PORT!),
tls: process.env.REDIS_TLS === 'true',
},
queue: driver<QueueConfig>(process.env.QUEUE_DRIVER, {
sqs: () => ({
queueUrl: process.env.AWS_SQS_QUEUE_URL!,
Expand Down
66 changes: 66 additions & 0 deletions apps/platform/src/config/rateLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { DefaultRedis, Redis, RedisConfig } from './redis'

// eslint-disable-next-line quotes
const incrTtlLuaScript = `redis.call('set', KEYS[1], 0, 'EX', ARGV[2], 'NX') \
local consumed = redis.call('incrby', KEYS[1], ARGV[1]) \
local ttl = redis.call('pttl', KEYS[1]) \
if ttl == -1 then \
redis.call('expire', KEYS[1], ARGV[2]) \
ttl = 1000 * ARGV[2] \
end \
return {consumed, ttl} \
`

interface RateLimitRedis extends Redis {
rlflxIncr(key: string, points: number, secDuration: number): Promise<[ consumed: number, ttl: number ]>
}

export interface RateLimitResponse {
exceeded: boolean
pointsRemaining: number
pointsUsed: number
msRemaining: number
}

export default (config: RedisConfig) => {
return new RateLimiter(config)
}

export class RateLimiter {
client: RateLimitRedis
constructor(config: RedisConfig) {
this.client = DefaultRedis(config) as RateLimitRedis
this.client.defineCommand('rlflxIncr', {
numberOfKeys: 1,
lua: incrTtlLuaScript,
})
}

async consume(key: string, limit: number, msDuration = 1000): Promise<RateLimitResponse> {
const secDuration = Math.floor(msDuration / 1000)
const response = await this.client.rlflxIncr(key, 1, secDuration)
return {
exceeded: response[0] > limit,
pointsRemaining: Math.max(0, limit - response[0]),
pointsUsed: response[0],
msRemaining: response[1],
}
}

async get(key: string) {
const response = await this.client
.multi()
.get(key)
.pttl(key)
.exec()
if (!response) return undefined
return {
pointsUsed: parseInt(response[0][1] as string, 10),
msRemaining: response[1][1],
}
}

async close() {
this.client.disconnect()
}
}
20 changes: 20 additions & 0 deletions apps/platform/src/config/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import IORedis, { Redis } from 'ioredis'

export interface RedisConfig {
host: string
port: number
tls: boolean
}

export const DefaultRedis = (config: RedisConfig, extraOptions = {}): Redis => {
return new IORedis({
port: config.port,
host: config.host,
tls: config.tls
? { rejectUnauthorized: false }
: undefined,
...extraOptions,
})
}

export { Redis }
8 changes: 6 additions & 2 deletions apps/platform/src/config/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Model from '../core/Model'
import { sleep, randomInt } from '../utilities'
import CampaignStateJob from '../campaigns/CampaignStateJob'

export default async (app: App) => {
export default (app: App) => {
const scheduler = new Scheduler(app)
scheduler.schedule({
rule: '* * * * *',
Expand Down Expand Up @@ -43,7 +43,7 @@ interface Schedule {
lockLength?: number
}

class Scheduler {
export class Scheduler {
app: App
constructor(app: App) {
this.app = app
Expand All @@ -61,6 +61,10 @@ class Scheduler {
}
})
}

async close() {
return await nodeScheduler.gracefulShutdown()
}
}

class JobLock extends Model {
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/projects/ProjectAdminController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ router.put('/:adminId', async ctx => {
const admin = await Admin.find(ctx.params.adminId)
if (!admin) throw new RequestError('Invalid admin ID', 404)
const { role } = validate(projectAdminParamsSchema, ctx.request.body)
if (ctx.state.admin!.id !== admin.id) throw new RequestError('You cannot add yourself to a project')
if (ctx.state.admin!.id === admin.id) throw new RequestError('You cannot add yourself to a project')
await addAdminToProject(ctx.state.project.id, admin.id, role)
ctx.body = await getProjectAdmin(ctx.state.project.id, admin.id)
})
Expand Down
23 changes: 23 additions & 0 deletions apps/platform/src/providers/MessageTriggerService.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import App from '../app'
import Campaign from '../campaigns/Campaign'
import { updateSendState } from '../campaigns/CampaignService'
import { RateLimitResponse } from '../config/rateLimit'
import Project from '../projects/Project'
import { EncodedJob } from '../queue'
import { RenderContext } from '../render'
import Template, { TemplateType } from '../render/Template'
import { User } from '../users/User'
import { UserEvent } from '../users/UserEvent'
import { partialMatchLocale } from '../utilities'
import EmailChannel from './email/EmailChannel'
import { MessageTrigger } from './MessageTrigger'
import TextChannel from './text/TextChannel'

interface MessageTriggerHydrated<T> {
user: User
Expand Down Expand Up @@ -61,3 +66,21 @@ export async function loadSendJob<T extends TemplateType>({ campaign_id, user_id

return { campaign, template: template.map() as T, user, project, event, context }
}

export const throttleSend = async (channel: EmailChannel | TextChannel): Promise<RateLimitResponse | undefined> => {
const provider = channel.provider

// If no rate limit, just break
if (!provider.rate_limit) return

// Otherwise consume points and check rate
return await App.main.rateLimiter.consume(
`ratelimit-${provider.id}`,
provider.rate_limit,
)
}

export const requeueSend = async (job: EncodedJob, delay: number): Promise<void> => {
job.options.delay = delay
return App.main.queue.enqueue(job)
}
6 changes: 6 additions & 0 deletions apps/platform/src/providers/Provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ export const ProviderSchema = <T extends ExternalProviderParams, D>(id: string,
nullable: true,
},
data,
rate_limit: {
type: 'number',
description: 'The per second maximum send rate.',
nullable: true,
},
},
additionalProperties: false,
} as any
Expand All @@ -41,6 +46,7 @@ export default class Provider extends Model {
group!: ProviderGroup
data!: Record<string, any>
is_default!: boolean
rate_limit!: number

static jsonAttributes = ['data']

Expand Down
Loading

0 comments on commit 4a70551

Please sign in to comment.