diff --git a/apps/platform/Dockerfile b/apps/platform/Dockerfile index 41daa82d..45b3f5c9 100644 --- a/apps/platform/Dockerfile +++ b/apps/platform/Dockerfile @@ -1,5 +1,5 @@ # --------------> The compiler image -FROM node:16 AS compile +FROM node:18 AS compile WORKDIR /usr/src/app/apps/platform COPY ./tsconfig.base.json /usr/src/app COPY ./apps/platform ./ @@ -7,7 +7,7 @@ RUN npm ci RUN npm run build # --------------> The build image -FROM node:16 AS build +FROM node:18 AS build WORKDIR /usr/src/app COPY --from=compile /usr/src/app/apps/platform/package*.json ./ COPY --from=compile /usr/src/app/apps/platform/build ./ @@ -17,7 +17,7 @@ COPY --from=compile /usr/src/app/apps/platform/public ./public RUN npm ci --only=production # --------------> The production image -FROM node:16-alpine +FROM node:18-alpine RUN apk add dumb-init ENV NODE_ENV="production" USER node diff --git a/apps/platform/db/migrations/20230418041529_add_provider_rate_limiter.js b/apps/platform/db/migrations/20230418041529_add_provider_rate_limiter.js new file mode 100644 index 00000000..4123d85f --- /dev/null +++ b/apps/platform/db/migrations/20230418041529_add_provider_rate_limiter.js @@ -0,0 +1,11 @@ +exports.up = async function(knex) { + await knex.schema.table('providers', function(table) { + table.integer('rate_limit').nullable().after('is_default') + }) +} + +exports.down = async function(knex) { + await knex.schema.table('providers', function(table) { + table.dropColumn('rate_limit') + }) +} diff --git a/apps/platform/src/app.ts b/apps/platform/src/app.ts index 1977e4d3..11819175 100644 --- a/apps/platform/src/app.ts +++ b/apps/platform/src/app.ts @@ -3,6 +3,7 @@ import loadQueue from './config/queue' import loadStorage from './config/storage' import loadAuth from './config/auth' import loadError, { logger } from './config/logger' +import loadRateLimit, { RateLimiter } from './config/rateLimit' import type { Env } from './config/env' import type Queue from './queue' import Storage from './storage' @@ -55,6 +56,7 @@ export default class App { uuid = uuid() api?: Api worker?: Worker + rateLimiter: RateLimiter #registered: { [key: string | number]: unknown } // eslint-disable-next-line no-useless-constructor @@ -67,6 +69,8 @@ export default class App { public error: ErrorHandler, ) { this.#registered = {} + this.rateLimiter = loadRateLimit(env.redis) + this.unhandledErrorListener() } async start() { @@ -89,6 +93,7 @@ export default class App { await this.worker?.close() await this.db.destroy() await this.queue.close() + await this.rateLimiter.close() } get(key: number | string): T { @@ -102,4 +107,18 @@ export default class App { remove(key: number | string) { delete this.#registered[key] } + + unhandledErrorListener() { + ['exit', 'SIGINT', 'SIGUSR1', 'SIGUSR2', 'SIGTERM'].forEach((eventType) => { + process.on(eventType, async () => { + await this.close() + process.exit() + }) + }) + process.on('uncaughtException', async (error) => { + logger.error(error, 'uncaught error') + await this.close() + process.exit() + }) + } } diff --git a/apps/platform/src/campaigns/Campaign.ts b/apps/platform/src/campaigns/Campaign.ts index 75efc3cf..b6434558 100644 --- a/apps/platform/src/campaigns/Campaign.ts +++ b/apps/platform/src/campaigns/Campaign.ts @@ -46,7 +46,7 @@ export type CampaignParams = Omit export type CampaignUpdateParams = Omit -export type CampaignSendState = 'pending' | 'sent' | 'failed' | 'bounced' | 'aborted' +export type CampaignSendState = 'pending' | 'sent' | 'throttled' | 'failed' | 'bounced' | 'aborted' export class CampaignSend extends Model { campaign_id!: number user_id!: number diff --git a/apps/platform/src/campaigns/CampaignService.ts b/apps/platform/src/campaigns/CampaignService.ts index 096ef39e..0819eef9 100644 --- a/apps/platform/src/campaigns/CampaignService.ts +++ b/apps/platform/src/campaigns/CampaignService.ts @@ -203,14 +203,24 @@ export const updateSendState = async (campaign: Campaign | number, user: User | const userId = user instanceof User ? user.id : user const campaignId = campaign instanceof Campaign ? campaign.id : campaign - return await CampaignSend.query() - .insert({ - user_id: userId, - campaign_id: campaignId, - state, - }) - .onConflict(['user_id', 'list_id']) - .merge(['state']) + // Update send state + const records = await CampaignSend.update( + qb => qb.where('user_id', userId) + .where('campaign_id', campaignId), + { state }, + ) + + // If no records were updated then try and create missing record + if (records <= 0) { + await CampaignSend.query() + .insert({ + user_id: userId, + campaign_id: campaignId, + state, + }) + .onConflict(['user_id', 'list_id']) + .merge(['state']) + } } export const generateSendList = async (campaign: SentCampaign) => { @@ -345,7 +355,7 @@ const totalUsersCount = async (listIds: number[], exclusionListIds: number[]): P export const campaignProgress = async (campaign: Campaign): Promise => { const progress = await CampaignSend.query() .where('campaign_id', campaign.id) - .select(CampaignSend.raw("SUM(IF(state = 'sent', 1, 0)) AS sent, SUM(IF(state = 'pending', 1, 0)) AS pending, COUNT(*) AS total, SUM(IF(opened_at IS NOT NULL, 1, 0)) AS opens, SUM(IF(clicks > 0, 1, 0)) AS clicks")) + .select(CampaignSend.raw("SUM(IF(state = 'sent', 1, 0)) AS sent, SUM(IF(state IN('pending', 'throttled'), 1, 0)) AS pending, COUNT(*) AS total, SUM(IF(opened_at IS NOT NULL, 1, 0)) AS opens, SUM(IF(clicks > 0, 1, 0)) AS clicks")) .first() return { sent: parseInt(progress.sent ?? 0), diff --git a/apps/platform/src/campaigns/CampaignStateJob.ts b/apps/platform/src/campaigns/CampaignStateJob.ts index b95504a7..d144d58d 100644 --- a/apps/platform/src/campaigns/CampaignStateJob.ts +++ b/apps/platform/src/campaigns/CampaignStateJob.ts @@ -7,10 +7,11 @@ export default class CampaignStateJob extends Job { static async handler() { const campaigns = await Campaign.query() - .whereIn('state', ['running', 'finished']) + .whereIn('state', ['scheduled', 'running', 'finished']) for (const campaign of campaigns) { const { sent, pending, total, opens, clicks } = await campaignProgress(campaign) - await updateCampaignProgress(campaign.id, campaign.project_id, pending <= 0 ? 'finished' : campaign.state, { sent, total, opens, clicks }) + const state = pending <= 0 ? 'finished' : sent === 0 ? 'scheduled' : 'running' + await updateCampaignProgress(campaign.id, campaign.project_id, state, { sent, total, opens, clicks }) } } } diff --git a/apps/platform/src/config/env.ts b/apps/platform/src/config/env.ts index c5eb8662..82ecadf7 100644 --- a/apps/platform/src/config/env.ts +++ b/apps/platform/src/config/env.ts @@ -4,6 +4,7 @@ import type { QueueConfig } from '../queue/Queue' import type { DatabaseConfig } from './database' import type { AuthConfig } from '../auth/Auth' import type { ErrorConfig } from '../error/ErrorHandler' +import { RedisConfig } from './redis' export type Runner = 'api' | 'worker' export interface Env { @@ -16,6 +17,7 @@ export interface Env { secret: string auth: AuthConfig error: ErrorConfig + redis: RedisConfig tracking: { linkWrap: boolean, deeplinkMirrorUrl: string | undefined, @@ -52,6 +54,11 @@ export default (type?: EnvType): Env => { database: process.env.DB_DATABASE!, }, }, + redis: { + host: process.env.REDIS_HOST!, + port: parseInt(process.env.REDIS_PORT!), + tls: process.env.REDIS_TLS === 'true', + }, queue: driver(process.env.QUEUE_DRIVER, { sqs: () => ({ queueUrl: process.env.AWS_SQS_QUEUE_URL!, diff --git a/apps/platform/src/config/rateLimit.ts b/apps/platform/src/config/rateLimit.ts new file mode 100644 index 00000000..1e610d21 --- /dev/null +++ b/apps/platform/src/config/rateLimit.ts @@ -0,0 +1,66 @@ +import { DefaultRedis, Redis, RedisConfig } from './redis' + +// eslint-disable-next-line quotes +const incrTtlLuaScript = `redis.call('set', KEYS[1], 0, 'EX', ARGV[2], 'NX') \ +local consumed = redis.call('incrby', KEYS[1], ARGV[1]) \ +local ttl = redis.call('pttl', KEYS[1]) \ +if ttl == -1 then \ + redis.call('expire', KEYS[1], ARGV[2]) \ + ttl = 1000 * ARGV[2] \ +end \ +return {consumed, ttl} \ +` + +interface RateLimitRedis extends Redis { + rlflxIncr(key: string, points: number, secDuration: number): Promise<[ consumed: number, ttl: number ]> +} + +export interface RateLimitResponse { + exceeded: boolean + pointsRemaining: number + pointsUsed: number + msRemaining: number +} + +export default (config: RedisConfig) => { + return new RateLimiter(config) +} + +export class RateLimiter { + client: RateLimitRedis + constructor(config: RedisConfig) { + this.client = DefaultRedis(config) as RateLimitRedis + this.client.defineCommand('rlflxIncr', { + numberOfKeys: 1, + lua: incrTtlLuaScript, + }) + } + + async consume(key: string, limit: number, msDuration = 1000): Promise { + const secDuration = Math.floor(msDuration / 1000) + const response = await this.client.rlflxIncr(key, 1, secDuration) + return { + exceeded: response[0] > limit, + pointsRemaining: Math.max(0, limit - response[0]), + pointsUsed: response[0], + msRemaining: response[1], + } + } + + async get(key: string) { + const response = await this.client + .multi() + .get(key) + .pttl(key) + .exec() + if (!response) return undefined + return { + pointsUsed: parseInt(response[0][1] as string, 10), + msRemaining: response[1][1], + } + } + + async close() { + this.client.disconnect() + } +} diff --git a/apps/platform/src/config/redis.ts b/apps/platform/src/config/redis.ts new file mode 100644 index 00000000..2e4681ee --- /dev/null +++ b/apps/platform/src/config/redis.ts @@ -0,0 +1,20 @@ +import IORedis, { Redis } from 'ioredis' + +export interface RedisConfig { + host: string + port: number + tls: boolean +} + +export const DefaultRedis = (config: RedisConfig, extraOptions = {}): Redis => { + return new IORedis({ + port: config.port, + host: config.host, + tls: config.tls + ? { rejectUnauthorized: false } + : undefined, + ...extraOptions, + }) +} + +export { Redis } diff --git a/apps/platform/src/config/scheduler.ts b/apps/platform/src/config/scheduler.ts index 8b5e893e..fc077602 100644 --- a/apps/platform/src/config/scheduler.ts +++ b/apps/platform/src/config/scheduler.ts @@ -9,7 +9,7 @@ import Model from '../core/Model' import { sleep, randomInt } from '../utilities' import CampaignStateJob from '../campaigns/CampaignStateJob' -export default async (app: App) => { +export default (app: App) => { const scheduler = new Scheduler(app) scheduler.schedule({ rule: '* * * * *', @@ -43,7 +43,7 @@ interface Schedule { lockLength?: number } -class Scheduler { +export class Scheduler { app: App constructor(app: App) { this.app = app @@ -61,6 +61,10 @@ class Scheduler { } }) } + + async close() { + return await nodeScheduler.gracefulShutdown() + } } class JobLock extends Model { diff --git a/apps/platform/src/projects/ProjectAdminController.ts b/apps/platform/src/projects/ProjectAdminController.ts index bd0ca1b5..83d843e5 100644 --- a/apps/platform/src/projects/ProjectAdminController.ts +++ b/apps/platform/src/projects/ProjectAdminController.ts @@ -40,7 +40,7 @@ router.put('/:adminId', async ctx => { const admin = await Admin.find(ctx.params.adminId) if (!admin) throw new RequestError('Invalid admin ID', 404) const { role } = validate(projectAdminParamsSchema, ctx.request.body) - if (ctx.state.admin!.id !== admin.id) throw new RequestError('You cannot add yourself to a project') + if (ctx.state.admin!.id === admin.id) throw new RequestError('You cannot add yourself to a project') await addAdminToProject(ctx.state.project.id, admin.id, role) ctx.body = await getProjectAdmin(ctx.state.project.id, admin.id) }) diff --git a/apps/platform/src/providers/MessageTriggerService.ts b/apps/platform/src/providers/MessageTriggerService.ts index 7c05d61d..bb1f7359 100644 --- a/apps/platform/src/providers/MessageTriggerService.ts +++ b/apps/platform/src/providers/MessageTriggerService.ts @@ -1,12 +1,17 @@ +import App from '../app' import Campaign from '../campaigns/Campaign' import { updateSendState } from '../campaigns/CampaignService' +import { RateLimitResponse } from '../config/rateLimit' import Project from '../projects/Project' +import { EncodedJob } from '../queue' import { RenderContext } from '../render' import Template, { TemplateType } from '../render/Template' import { User } from '../users/User' import { UserEvent } from '../users/UserEvent' import { partialMatchLocale } from '../utilities' +import EmailChannel from './email/EmailChannel' import { MessageTrigger } from './MessageTrigger' +import TextChannel from './text/TextChannel' interface MessageTriggerHydrated { user: User @@ -61,3 +66,21 @@ export async function loadSendJob({ campaign_id, user_id return { campaign, template: template.map() as T, user, project, event, context } } + +export const throttleSend = async (channel: EmailChannel | TextChannel): Promise => { + const provider = channel.provider + + // If no rate limit, just break + if (!provider.rate_limit) return + + // Otherwise consume points and check rate + return await App.main.rateLimiter.consume( + `ratelimit-${provider.id}`, + provider.rate_limit, + ) +} + +export const requeueSend = async (job: EncodedJob, delay: number): Promise => { + job.options.delay = delay + return App.main.queue.enqueue(job) +} diff --git a/apps/platform/src/providers/Provider.ts b/apps/platform/src/providers/Provider.ts index feaa4dfa..ced50002 100644 --- a/apps/platform/src/providers/Provider.ts +++ b/apps/platform/src/providers/Provider.ts @@ -29,6 +29,11 @@ export const ProviderSchema = (id: string, nullable: true, }, data, + rate_limit: { + type: 'number', + description: 'The per second maximum send rate.', + nullable: true, + }, }, additionalProperties: false, } as any @@ -41,6 +46,7 @@ export default class Provider extends Model { group!: ProviderGroup data!: Record is_default!: boolean + rate_limit!: number static jsonAttributes = ['data'] diff --git a/apps/platform/src/providers/email/EmailJob.ts b/apps/platform/src/providers/email/EmailJob.ts index 4275ce1c..30548fb1 100644 --- a/apps/platform/src/providers/email/EmailJob.ts +++ b/apps/platform/src/providers/email/EmailJob.ts @@ -1,10 +1,11 @@ -import { Job } from '../../queue' +import Job from '../../queue/Job' import { createEvent } from '../../users/UserEventRepository' import { MessageTrigger } from '../MessageTrigger' import { updateSendState } from '../../campaigns/CampaignService' import { loadEmailChannel } from '.' -import { loadSendJob } from '../MessageTriggerService' +import { loadSendJob, requeueSend, throttleSend } from '../MessageTriggerService' import { EmailTemplate } from '../../render/Template' +import { EncodedJob } from '../../queue' export default class EmailJob extends Job { static $name = 'email' @@ -13,16 +14,11 @@ export default class EmailJob extends Job { return new this(data) } - static async handler(trigger: MessageTrigger) { + static async handler(trigger: MessageTrigger, raw: EncodedJob) { const data = await loadSendJob(trigger) if (!data) return - const { campaign, template, user, project, event } = data - const context = { - campaign_id: campaign.id, - template_id: template.id, - subscription_id: campaign.subscription_id, - } + const { campaign, template, user, project, event, context } = data // Send and render email const channel = await loadEmailChannel(campaign.provider_id, project.id) @@ -30,6 +26,21 @@ export default class EmailJob extends Job { await updateSendState(campaign, user, 'failed') return } + + // Check current send rate, if exceeded then requeue job + // at a time in the future + const rateCheck = await throttleSend(channel) + if (rateCheck?.exceeded) { + + // Mark state as throttled so it is not continuously added + // to the queue + await updateSendState(campaign, user, 'throttled') + + // Schedule the resend for after the throttle finishes + await requeueSend(raw, rateCheck.msRemaining) + return + } + await channel.send(template, { user, event, context }) // Update send record diff --git a/apps/platform/src/providers/text/TextJob.ts b/apps/platform/src/providers/text/TextJob.ts index 8fa1050d..ac516b1e 100644 --- a/apps/platform/src/providers/text/TextJob.ts +++ b/apps/platform/src/providers/text/TextJob.ts @@ -1,9 +1,9 @@ -import { Job } from '../../queue' +import Job, { EncodedJob } from '../../queue/Job' import { TextTemplate } from '../../render/Template' import { createEvent } from '../../users/UserEventRepository' import { MessageTrigger } from '../MessageTrigger' import { updateSendState } from '../../campaigns/CampaignService' -import { loadSendJob } from '../MessageTriggerService' +import { loadSendJob, requeueSend, throttleSend } from '../MessageTriggerService' import { loadTextChannel } from '.' export default class TextJob extends Job { @@ -13,7 +13,7 @@ export default class TextJob extends Job { return new this(data) } - static async handler(trigger: MessageTrigger) { + static async handler(trigger: MessageTrigger, raw: EncodedJob) { const data = await loadSendJob(trigger) if (!data) return @@ -26,6 +26,21 @@ export default class TextJob extends Job { await updateSendState(campaign, user, 'failed') return } + + // Check current send rate, if exceeded then requeue job + // at a time in the future + const rateCheck = await throttleSend(channel) + if (rateCheck?.exceeded) { + + // Mark state as throttled so it is not continuously added + // to the queue + await updateSendState(campaign, user, 'throttled') + + // Schedule the resend for after the throttle finishes + await requeueSend(raw, rateCheck.msRemaining) + return + } + await channel.send(template, { user, event, context }) // Update send record diff --git a/apps/platform/src/queue/Job.ts b/apps/platform/src/queue/Job.ts index 0f69b9d3..2db404b9 100644 --- a/apps/platform/src/queue/Job.ts +++ b/apps/platform/src/queue/Job.ts @@ -37,7 +37,7 @@ export default class Job implements EncodedJob { } // eslint-disable-next-line @typescript-eslint/no-unused-vars - static async handler(_: any): Promise { + static async handler(data: any, raw?: EncodedJob): Promise { return Promise.reject(new Error('Handler not defined.')) } diff --git a/apps/platform/src/queue/Queue.ts b/apps/platform/src/queue/Queue.ts index b735c531..a752a45e 100644 --- a/apps/platform/src/queue/Queue.ts +++ b/apps/platform/src/queue/Queue.ts @@ -5,10 +5,10 @@ import { LoggerConfig } from '../providers/LoggerProvider' import Job, { EncodedJob } from './Job' import MemoryQueueProvider, { MemoryConfig } from './MemoryQueueProvider' import QueueProvider, { MetricPeriod, QueueMetric, QueueProviderName } from './QueueProvider' -import RedisQueueProvider, { RedisConfig } from './RedisQueueProvider' +import RedisQueueProvider, { RedisQueueConfig } from './RedisQueueProvider' import SQSQueueProvider, { SQSConfig } from './SQSQueueProvider' -export type QueueConfig = SQSConfig | RedisConfig | MemoryConfig | LoggerConfig +export type QueueConfig = SQSConfig | RedisQueueConfig | MemoryConfig | LoggerConfig export interface QueueTypeConfig extends DriverConfig { driver: QueueProviderName @@ -16,7 +16,7 @@ export interface QueueTypeConfig extends DriverConfig { export default class Queue { provider: QueueProvider - jobs: Record Promise> = {} + jobs: Record Promise> = {} constructor(config?: QueueConfig) { if (config?.driver === 'sqs') { @@ -33,7 +33,7 @@ export default class Queue { async dequeue(job: EncodedJob): Promise { try { await this.started(job) - await this.jobs[job.name](job.data) + await this.jobs[job.name](job.data, job) await this.completed(job) } catch (error: any) { this.errored(error, job) @@ -41,12 +41,12 @@ export default class Queue { return true } - async enqueue(job: Job): Promise { - logger.info(job.toJSON(), 'queue:job:enqueued') + async enqueue(job: Job | EncodedJob): Promise { + logger.info(job instanceof Job ? job.toJSON() : job, 'queue:job:enqueued') return await this.provider.enqueue(job) } - async enqueueBatch(jobs: Job[]): Promise { + async enqueueBatch(jobs: EncodedJob[]): Promise { logger.info({ count: jobs.length }, 'queue:job:enqueuedBatch') return await this.provider.enqueueBatch(jobs) } diff --git a/apps/platform/src/queue/QueueProvider.ts b/apps/platform/src/queue/QueueProvider.ts index 668faaf1..c5e3a2a7 100644 --- a/apps/platform/src/queue/QueueProvider.ts +++ b/apps/platform/src/queue/QueueProvider.ts @@ -1,5 +1,5 @@ import Queue from './Queue' -import Job from './Job' +import { EncodedJob } from './Job' export type QueueProviderName = 'sqs' | 'redis' | 'memory' | 'logger' @@ -24,8 +24,8 @@ export enum MetricPeriod { export default interface QueueProvider { queue: Queue batchSize: number - enqueue(job: Job): Promise - enqueueBatch(jobs: Job[]): Promise + enqueue(job: EncodedJob): Promise + enqueueBatch(jobs: EncodedJob[]): Promise start(): void close(): void metrics?(period: MetricPeriod): Promise diff --git a/apps/platform/src/queue/RedisQueueProvider.ts b/apps/platform/src/queue/RedisQueueProvider.ts index 0153a2fb..0b0b643b 100644 --- a/apps/platform/src/queue/RedisQueueProvider.ts +++ b/apps/platform/src/queue/RedisQueueProvider.ts @@ -2,16 +2,13 @@ import { MetricsTime, Queue as BullQueue, Worker } from 'bullmq' import { subMinutes } from 'date-fns' import { logger } from '../config/logger' import { batch } from '../utilities' -import Job, { EncodedJob } from './Job' +import { EncodedJob } from './Job' import Queue, { QueueTypeConfig } from './Queue' import QueueProvider, { MetricPeriod, QueueMetric } from './QueueProvider' -import IORedis, { Redis } from 'ioredis' +import { DefaultRedis, Redis, RedisConfig } from '../config/redis' -export interface RedisConfig extends QueueTypeConfig { +export interface RedisQueueConfig extends QueueTypeConfig, RedisConfig { driver: 'redis' - host: string - port: number - tls: boolean } export default class RedisQueueProvider implements QueueProvider { @@ -23,14 +20,9 @@ export default class RedisQueueProvider implements QueueProvider { batchSize = 40 as const queueName = 'parcelvoy' as const - constructor(config: RedisConfig, queue: Queue) { + constructor(config: RedisQueueConfig, queue: Queue) { this.queue = queue - this.redis = new IORedis({ - port: config.port, - host: config.host, - tls: config.tls - ? { rejectUnauthorized: false } - : undefined, + this.redis = DefaultRedis(config, { maxRetriesPerRequest: null, }) this.bull = new BullQueue(this.queueName, { @@ -45,7 +37,7 @@ export default class RedisQueueProvider implements QueueProvider { }) } - async enqueue(job: Job): Promise { + async enqueue(job: EncodedJob): Promise { try { const { name, data, opts } = this.adaptJob(job) await this.bull.add(name, data, opts) @@ -54,13 +46,13 @@ export default class RedisQueueProvider implements QueueProvider { } } - async enqueueBatch(jobs: Job[]): Promise { + async enqueueBatch(jobs: EncodedJob[]): Promise { for (const part of batch(jobs, this.batchSize)) { await this.bull.addBulk(part.map(item => this.adaptJob(item))) } } - private adaptJob(job: Job) { + private adaptJob(job: EncodedJob) { return { name: job.name, data: job, @@ -90,6 +82,10 @@ export default class RedisQueueProvider implements QueueProvider { this.worker.on('failed', (job, error) => { this.queue.errored(error, job?.data as EncodedJob) }) + + this.worker.on('error', error => { + this.queue.errored(error) + }) } close(): void { diff --git a/apps/platform/src/users/UserImport.ts b/apps/platform/src/users/UserImport.ts index d7edcb18..c11c91e0 100644 --- a/apps/platform/src/users/UserImport.ts +++ b/apps/platform/src/users/UserImport.ts @@ -12,14 +12,14 @@ export interface UserImport { export const importUsers = async ({ project_id, stream, list_id }: UserImport) => { const parser = stream.file.pipe(parse({ columns: true })) - - for await (const { external_id, email, phone, ...data } of parser) { + for await (const { external_id, email, phone, timezone, ...data } of parser) { await App.main.queue.enqueue(UserPatchJob.from({ project_id, user: { external_id, email, - phone, + phone: cleanNulls(timezone), + timezone: cleanNulls(timezone), data, }, options: { @@ -28,3 +28,9 @@ export const importUsers = async ({ project_id, stream, list_id }: UserImport) = })) } } + +const cleanNulls = (value?: string, emptyIsNull = true) => { + if (value === 'NULL' || value == null) return undefined + if (value === '' && emptyIsNull) return undefined + return value +} diff --git a/apps/platform/src/users/UserPatchJob.ts b/apps/platform/src/users/UserPatchJob.ts index 0a2a158b..de0c464f 100644 --- a/apps/platform/src/users/UserPatchJob.ts +++ b/apps/platform/src/users/UserPatchJob.ts @@ -30,6 +30,9 @@ export default class UserPatchJob extends Job { // Check for existing user const existing = await getUserFromClientId(project_id, identity) + // TODO: Utilize phone and email as backup identifiers + // to decrease the likelihood of future duplicates + // If user, update otherwise insert try { return existing @@ -43,7 +46,9 @@ export default class UserPatchJob extends Job { ...fields, }) } catch (error: any) { - // If there is an error (such as constraints, retry) + // If there is an error (such as constraints, + // retry up to three times) + if (tries <= 0) throw error return upsert(patch, --tries) } } diff --git a/apps/platform/src/users/UserRepository.ts b/apps/platform/src/users/UserRepository.ts index 8471e34b..5ed6d381 100644 --- a/apps/platform/src/users/UserRepository.ts +++ b/apps/platform/src/users/UserRepository.ts @@ -60,6 +60,15 @@ export const aliasUser = async (projectId: number, { } as ClientIdentity) if (!previous) return + + // Look up if there is a separate profile with the new ID + // If there is one, the client is not aliasing properly + // and is creating duplicates, just break + const current = await getUserFromClientId(projectId, { + external_id, + } as ClientIdentity) + if (current) return + return await User.updateAndFetch(previous.id, { external_id }) } diff --git a/apps/platform/src/worker.ts b/apps/platform/src/worker.ts index 1ca7883c..fdf89e2b 100644 --- a/apps/platform/src/worker.ts +++ b/apps/platform/src/worker.ts @@ -1,10 +1,10 @@ import { loadWorker } from './config/queue' -import scheduler from './config/scheduler' +import scheduler, { Scheduler } from './config/scheduler' import Queue from './queue' export default class Worker { worker: Queue - scheduler: any + scheduler: Scheduler constructor( public app: import('./app').default, ) { @@ -18,6 +18,6 @@ export default class Worker { async close() { await this.worker.close() - await this.scheduler.gracefulShutdown() + await this.scheduler.close() } } diff --git a/apps/ui/src/types.ts b/apps/ui/src/types.ts index 39c00700..4b95f899 100644 --- a/apps/ui/src/types.ts +++ b/apps/ui/src/types.ts @@ -295,7 +295,7 @@ export interface Campaign { updated_at: string } -export type CampaignSendState = 'pending' | 'sent' | 'failed' +export type CampaignSendState = 'pending' | 'throttled' | 'sent' | 'failed' export type CampaignUpdateParams = Partial> export type CampaignCreateParams = Pick @@ -411,9 +411,10 @@ export interface Provider { type: string group: string data: any + rate_limit: number } -export type ProviderCreateParams = Pick +export type ProviderCreateParams = Pick export type ProviderUpdateParams = ProviderCreateParams export interface ProviderMeta { name: string diff --git a/apps/ui/src/views/campaign/CampaignDelivery.tsx b/apps/ui/src/views/campaign/CampaignDelivery.tsx index 53405418..a76dc630 100644 --- a/apps/ui/src/views/campaign/CampaignDelivery.tsx +++ b/apps/ui/src/views/campaign/CampaignDelivery.tsx @@ -14,6 +14,7 @@ import { useRoute } from '../router' export const CampaignSendTag = ({ state }: { state: CampaignSendState }) => { const variant: Record = { pending: 'info', + throttled: 'warn', sent: 'success', failed: 'error', } diff --git a/apps/ui/src/views/settings/IntegrationModal.tsx b/apps/ui/src/views/settings/IntegrationModal.tsx index 9b6da38b..c6b1e84c 100644 --- a/apps/ui/src/views/settings/IntegrationModal.tsx +++ b/apps/ui/src/views/settings/IntegrationModal.tsx @@ -27,11 +27,11 @@ export default function IntegrationModal({ onChange, provider, ...props }: Integ setMeta(options?.find(item => item.channel === provider?.group && item.type === provider?.type)) }, [provider]) - async function handleCreate({ name, data = {} }: ProviderCreateParams | ProviderUpdateParams, meta: ProviderMeta) { + async function handleCreate({ name, rate_limit, data = {} }: ProviderCreateParams | ProviderUpdateParams, meta: ProviderMeta) { const value = provider?.id - ? await api.providers.update(project.id, provider?.id, { name, data, type: meta.type, group: meta.channel }) - : await api.providers.create(project.id, { name, data, type: meta.type, group: meta.channel }) + ? await api.providers.update(project.id, provider?.id, { name, data, type: meta.type, group: meta.channel, rate_limit }) + : await api.providers.create(project.id, { name, data, type: meta.type, group: meta.channel, rate_limit }) onChange(value) props.onClose(false) @@ -96,6 +96,11 @@ export default function IntegrationModal({ onChange, provider, ...props }: Integ

Config

+ }