Skip to content

Commit

Permalink
TASK: Prevent multiple catchup runs
Browse files Browse the repository at this point in the history
Will use caches to try avoiding to even start async
catchups if for that projection one is still running.
Also adds a simple single slot queue with incremental
backup to ensure a requested catchup definitely happens.
  • Loading branch information
kitsunet committed Nov 28, 2023
1 parent 6fb7940 commit f52910f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry\Command;

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
Expand All @@ -24,6 +25,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
*/
protected $contentRepositoryRegistry;

/**
* @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates")
* @var VariableFrontend
*/
protected $catchUpStatesCache;


/**
* @param string $contentRepositoryIdentifier
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName fully qualified class name of the projection to catch up
Expand All @@ -33,5 +41,6 @@ public function catchupCommand(string $contentRepositoryIdentifier, string $proj
{
$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier));
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
$this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
declare(strict_types=1);
namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger;

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\Flow\Annotations as Flow;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
Expand All @@ -20,6 +23,12 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte
*/
protected $flowSettings;

/**
* @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates")
* @var VariableFrontend
*/
protected $catchUpStatesCache;

public function __construct(
private readonly ContentRepositoryId $contentRepositoryId
) {
Expand All @@ -29,15 +38,80 @@ public function triggerCatchUp(Projections $projections): void
{
// modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103
// and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php
foreach ($projections as $projection) {
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => get_class($projection)
]
);
$queuedProjections = array_map(fn($projection) => $this->startCatchUpWithQueueing($projection), iterator_to_array($projections->getIterator()));
$queuedProjections = array_filter($queuedProjections);

$attempts = 0;
while (!empty($queuedProjections)) {
usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms
if (++$attempts > 50) {
throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279);
}

foreach ($queuedProjections as $key => $projection) {
if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) {
// another process has started a catchUp while we waited, our queue has become irrelevant
unset($queuedProjections[$key]);
}
$hasStarted = $this->startCatchUp($projection);
if ($hasStarted) {
unset($queuedProjections[$key]);
$this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED');
}
}
$queuedProjections = array_values($queuedProjections);
}
}

/**
* @param ProjectionInterface $projection
* @return bool has catchUp been started for given projection
* @throws \Neos\Cache\Exception
*/
private function startCatchUp(ProjectionInterface $projection): bool
{
if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) {
return false;
}

$this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1);
// We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it.
$this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED');
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => get_class($projection)
]
);

return true;
}

/**
* @param ProjectionInterface $projection
* @return ProjectionInterface|null Returns only projections that have been queued for later retry.
* @throws \Neos\Cache\Exception
*/
private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface
{
$catchUpStarted = $this->startCatchUp($projection);
if ($catchUpStarted) {
return null;
}

if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) {
$this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1);
return $projection;
}

return null;
}

private function cacheKeyPrefix(ProjectionInterface $projection): string
{
$projectionClassName = get_class($projection);
return md5($this->contentRepositoryId->value . $projectionClassName);
}
}
6 changes: 6 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Caches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents:
backend: Neos\Cache\Backend\FileBackend
backendOptions:
defaultLifetime: 400

Neos_ContentRepositoryRegistry_CatchUpStates:
frontend: Neos\Cache\Frontend\VariableFrontend
backend: Neos\Cache\Backend\FileBackend
backendOptions:
defaultLifetime: 30
9 changes: 9 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Objects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,12 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection
value: Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjectionFactory
2:
object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface'


'Neos.ContentRepositoryRegistry:CacheCatchUpStates':
className: Neos\Cache\Frontend\VariableFrontend
factoryObjectName: Neos\Flow\Cache\CacheManager
factoryMethodName: getCache
arguments:
1:
value: Neos_ContentRepositoryRegistry_CatchUpStates

0 comments on commit f52910f

Please sign in to comment.