Skip to content

Commit

Permalink
Adds soft-delete for scenarios jobs to fix race condition
Browse files Browse the repository at this point in the history
Race condition could happen when:
- job was marked as finished in hermes worker 
- scenarion engine at the same time loaded finished jobs and deleted them 
- however, hermes worker was not yet finished with already deleted job  This solution adds soft delete to scenarios_jobs 
- jobs are deleted from DB after given delay 

remp/crm#2732
  • Loading branch information
miroc committed Jan 25, 2023
1 parent fb7e985 commit 2104ae9
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 67 deletions.
68 changes: 37 additions & 31 deletions src/Models/Repository/JobsRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

class JobsRepository extends Repository
{
const STATE_CREATED = 'created';
const STATE_SCHEDULED = 'scheduled'; // job is scheduled to run
const STATE_STARTED = 'started'; // job has already started and is running
const STATE_FINISHED = 'finished';
const STATE_FAILED = 'failed';
public const STATE_CREATED = 'created';
public const STATE_SCHEDULED = 'scheduled'; // job is scheduled to run
public const STATE_STARTED = 'started'; // job has already started and is running
public const STATE_FINISHED = 'finished';
public const STATE_FAILED = 'failed';

const CONTEXT_HERMES_MESSAGE_TYPE = 'hermes_message_type';
const CONTEXT_BEFORE_EVENT = 'before_event';
public const CONTEXT_HERMES_MESSAGE_TYPE = 'hermes_message_type';
public const CONTEXT_BEFORE_EVENT = 'before_event';

final public static function allStates(): array
{
Expand All @@ -35,16 +35,12 @@ final public static function allStates(): array

protected $tableName = 'scenarios_jobs';

private $elementStatsRepository;

public function __construct(
Explorer $database,
ElementStatsRepository $elementStatsRepository,
private ElementStatsRepository $elementStatsRepository,
Storage $cacheStorage = null
) {
parent::__construct($database, $cacheStorage);

$this->elementStatsRepository = $elementStatsRepository;
}

/**
Expand All @@ -71,8 +67,7 @@ final public function addTrigger($triggerId, array $parameters, ?array $context
$data['context'] = Json::encode($context);
}

$trigger = $this->insert($data);
return $trigger;
return $this->insert($data);
}

final public function addElement($elementId, array $parameters, ?array $context = null)
Expand All @@ -90,8 +85,7 @@ final public function addElement($elementId, array $parameters, ?array $context
$data['context'] = Json::encode($context);
}

$element = $this->insert($data);
return $element;
return $this->insert($data);
}

final public function update(ActiveRow &$row, $data)
Expand All @@ -100,9 +94,16 @@ final public function update(ActiveRow &$row, $data)
return parent::update($row, $data);
}

final public function softDelete(ActiveRow $job): void
{
$this->update($job, [
'deleted_at' => new \DateTime(),
]);
}

final public function getReadyToProcessJobs(): Selection
{
return $this->getTable()
return $this->getAllJobs()
->where('state IN (?)', [
self::STATE_CREATED, self::STATE_FINISHED, self::STATE_FAILED
]);
Expand Down Expand Up @@ -153,7 +154,7 @@ final public function finishJob(ActiveRow $row, bool $recordStats = true): Activ
'state' => self::STATE_FINISHED,
'finished_at' => new DateTime(),
]);
if ($row && $recordStats) {
if ($recordStats) {
if (!isset($row->element_id)) {
Debugger::log("JobsRepository - trying to finish job with no associated element, row data: " . Json::encode($row->toArray()), Debugger::WARNING);
} else {
Expand All @@ -171,39 +172,44 @@ final public function scheduleJob(ActiveRow $row): ActiveRow
return $row;
}

final public function getAllJobs()
final public function getAllJobs(): Selection
{
return $this->getTable()->where('scenarios_jobs.deleted_at IS NULL');
}

final public function getDeletedJobs(): Selection
{
return $this->getTable();
return $this->getTable()->where('scenarios_jobs.deleted_at IS NOT NULL');
}

final public function getUnprocessedJobs()
final public function getUnprocessedJobs(): Selection
{
return $this->getTable()->where(['state' => self::STATE_CREATED]);
return $this->getAllJobs()->where(['state' => self::STATE_CREATED]);
}

final public function getStartedJobs()
final public function getStartedJobs(): Selection
{
return $this->getTable()->where(['state' => self::STATE_STARTED]);
return $this->getAllJobs()->where(['state' => self::STATE_STARTED]);
}

final public function getScheduledJobs()
final public function getScheduledJobs(): Selection
{
return $this->getTable()->where(['state' => self::STATE_SCHEDULED]);
return $this->getAllJobs()->where(['state' => self::STATE_SCHEDULED]);
}

final public function getFinishedJobs()
final public function getFinishedJobs(): Selection
{
return $this->getTable()->where(['state' => self::STATE_FINISHED]);
return $this->getAllJobs()->where(['state' => self::STATE_FINISHED]);
}

final public function getFailedJobs()
final public function getFailedJobs(): Selection
{
return $this->getTable()->where(['state' => self::STATE_FAILED]);
return $this->getAllJobs()->where(['state' => self::STATE_FAILED]);
}

final public function getCountForElementsAndState(array $elementIds): array
{
$items = $this->getTable()->select('element_id, state, COUNT(*) AS total')
$items = $this->getAllJobs()->select('element_id, state, COUNT(*) AS total')
->where('element_id', $elementIds)
->group('element_id, state');

Expand Down
61 changes: 25 additions & 36 deletions src/engine/Engine.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,26 @@ class Engine
public const MAX_RETRY_COUNT = 3;

// 50000us = 50ms = 0.05s
private $minSleepTime = 50000; // in microseconds
private int $minSleepTime = 50000; // in microseconds

// 1s
private $maxSleepTime = 1000000; // in microseconds
private int $maxSleepTime = 1000000; // in microseconds

private $minGraphReloadDelay = 60; // in seconds
private int $minGraphReloadDelay = 60; // in seconds

private $logger;
private ?ShutdownInterface $shutdown = null;

private $jobsRepository;

private $graphConfiguration;

private $elementsRepository;

private $hermesEmitter;

/** @var ShutdownInterface */
private $shutdown;

private $startTime;

private $triggerStatsRepository;
private DateTime $startTime;

public function __construct(
LoggerInterface $logger,
Emitter $hermesEmitter,
JobsRepository $jobsRepository,
GraphConfiguration $graphConfiguration,
ElementsRepository $elementsRepository,
TriggerStatsRepository $triggerStatsRepository
private LoggerInterface $logger,
private Emitter $hermesEmitter,
private JobsRepository $jobsRepository,
private GraphConfiguration $graphConfiguration,
private ElementsRepository $elementsRepository,
private TriggerStatsRepository $triggerStatsRepository
) {
$this->logger = $logger;
$this->jobsRepository = $jobsRepository;
$this->graphConfiguration = $graphConfiguration;
$this->elementsRepository = $elementsRepository;
$this->hermesEmitter = $hermesEmitter;
$this->startTime = new DateTime();
$this->triggerStatsRepository = $triggerStatsRepository;
}

public function setMinSleepTime(int $minSleepTime): void
Expand Down Expand Up @@ -98,6 +79,8 @@ public function run(?int $times = null): void
// For fixed amount of iterations, always reload graph
$this->graphConfiguration->reload($times !== null ? 0 : $this->minGraphReloadDelay);

// Single job processing

$jobs = $this->jobsRepository->getReadyToProcessJobsForEnabledScenarios()
->order(
'FIELD(state, ?, ?, ?), updated_at',
Expand All @@ -121,11 +104,18 @@ public function run(?int $times = null): void
}
}

// Batch job processing

$deletedCount = $this->jobsRepository->deleteUnprocessableJobsForScenarios();
if ($deletedCount) {
$this->logger->log(LogLevel::INFO, "Deleted {$deletedCount} unprocessable jobs.");
}

// jobs are first soft-deleted and then deleted later (to let asynchronous Hermes worker(s) finish)
$this->jobsRepository->getDeletedJobs()
->where(['deleted_at <= ?' => DateTime::from('-1 minute')])
->delete();

// for fixed amount of iterations, do not sleep or wait for shutdown
if ($times !== null) {
$i--;
Expand All @@ -149,7 +139,6 @@ public function run(?int $times = null): void
}
}


/**
* Calculates exponential delay.
*
Expand Down Expand Up @@ -177,10 +166,10 @@ private function processFailedJob(ActiveRow $job)

if (!$shouldRetry) {
$this->logger->log(LogLevel::ERROR, 'Failed job found and retry is not allowed, cancelling', $this->jobLoggerContext($job));
$this->jobsRepository->delete($job);
$this->jobsRepository->softDelete($job);
} elseif ($job->retry_count >= self::MAX_RETRY_COUNT) {
$this->logger->log(LogLevel::ERROR, "Failed job found, it has already failed {$job->retry_count} times, cancelling", $this->jobLoggerContext($job));
$this->jobsRepository->delete($job);
$this->jobsRepository->softDelete($job);
} else {
$this->logger->log(LogLevel::WARNING, 'Failed job found (retry allowed), rescheduling', $this->jobLoggerContext($job));
$this->jobsRepository->update($job, [
Expand Down Expand Up @@ -211,7 +200,7 @@ private function processFinishedJob(ActiveRow $job)
// This can happen if user updates running scenario
$this->logger->log(LogLevel::WARNING, $e->getMessage(), $this->jobLoggerContext($job));
} finally {
$this->jobsRepository->delete($job);
$this->jobsRepository->softDelete($job);
}
}

Expand All @@ -231,7 +220,7 @@ private function processCreatedJob(ActiveRow $job)
$this->processJobElement($job);
} else {
$this->logger->log(LogLevel::ERROR, 'Scenarios job without associated trigger or element', $this->jobLoggerContext($job));
$this->jobsRepository->delete($job);
$this->jobsRepository->softDelete($job);
}
}

Expand Down Expand Up @@ -287,7 +276,7 @@ private function processJobElement(ActiveRow $job)
}
} catch (InvalidJobException $exception) {
$this->logger->log(LogLevel::ERROR, $exception->getMessage(), $this->jobLoggerContext($job));
$this->jobsRepository->delete($job);
$this->jobsRepository->softDelete($job);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);

use Phinx\Migration\AbstractMigration;

final class AddSoftDeleteToScenariosJobs extends AbstractMigration
{
public function change(): void
{
$this->table('scenarios_jobs')
->addColumn('deleted_at', 'datetime', ['null' => true, 'after' => 'updated_at'])
->addIndex('deleted_at')
->update();
}
}

0 comments on commit 2104ae9

Please sign in to comment.