diff --git a/apps/platform/db/migrations/20241228214210_modify_journey_user_step_indexes.js b/apps/platform/db/migrations/20241228214210_modify_journey_user_step_indexes.js new file mode 100644 index 00000000..eab10db9 --- /dev/null +++ b/apps/platform/db/migrations/20241228214210_modify_journey_user_step_indexes.js @@ -0,0 +1,13 @@ +exports.up = async function(knex) { + await knex.schema.alterTable('journey_user_step', function(table) { + table.index(['journey_id', 'type', 'delay_until']) + table.dropIndex(['type', 'delay_until']) + }) +} + +exports.down = async function(knex) { + await knex.schema.alterTable('journey_user_step', function(table) { + table.index(['type', 'delay_until']) + table.dropIndex(['journey_id', 'type', 'delay_until']) + }) +} diff --git a/apps/platform/src/lists/ListEvaluateUserJob.ts b/apps/platform/src/lists/ListEvaluateUserJob.ts index d95e527c..d1071164 100644 --- a/apps/platform/src/lists/ListEvaluateUserJob.ts +++ b/apps/platform/src/lists/ListEvaluateUserJob.ts @@ -1,7 +1,6 @@ import App from '../app' import { cacheIncr } from '../config/redis' import { Job } from '../queue' -import { JobPriority } from '../queue/Job' import { getUser } from '../users/UserRepository' import { DynamicList } from './List' import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService' @@ -17,12 +16,6 @@ interface ListEvaluateUserParams { export default class ListEvaluateUserJob extends Job { static $name = 'list_evaluate_user_job' - options = { - delay: 0, - attempts: 3, - priority: JobPriority.low, - } - static from(params: ListEvaluateUserParams): ListEvaluateUserJob { return new this(params) } diff --git a/apps/platform/src/utilities/index.ts b/apps/platform/src/utilities/index.ts index 2b6613a3..49e919a9 100644 --- a/apps/platform/src/utilities/index.ts +++ b/apps/platform/src/utilities/index.ts @@ -224,9 +224,22 @@ export const chunk = async ( modifier: (result: any) => T = (result) => result, ) => { const chunker = new Chunker(callback, size) + const handler = async (result: any, retries = 3) => { + try { + await chunker.add(modifier(result)) + } catch (error: any) { + + // In the case of deadlocks, retry the operation + if (['ER_LOCK_WAIT_TIMEOUT', 'ER_LOCK_DEADLOCK'].includes(error.code) && retries > 0) { + setTimeout(() => handler(result, retries - 1), 250) + } else { + throw error + } + } + } await query.stream(async function(stream) { for await (const result of stream) { - await chunker.add(modifier(result)) + await handler(result) } }) await chunker.flush()