Skip to content

Commit

Permalink
TASK: Prevent multiple catchup runs
Browse files Browse the repository at this point in the history
Will use caches to try avoiding to even start async
catchups if for that projection one is still running.
Also adds a simple single slot queue with incremental
backup to ensure a requested catchup definitely happens.
  • Loading branch information
kitsunet committed Nov 14, 2023
1 parent 983db91 commit 8259b2e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry\Command;

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
Expand All @@ -24,6 +25,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
*/
protected $contentRepositoryRegistry;

/**
* @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates")
* @var VariableFrontend
*/
protected $catchUpStatesCache;


/**
* @param string $contentRepositoryIdentifier
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName fully qualified class name of the projection to catch up
Expand All @@ -33,5 +41,6 @@ public function catchupCommand(string $contentRepositoryIdentifier, string $proj
{
$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier));
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
$this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
declare(strict_types=1);
namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger;

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\Flow\Annotations as Flow;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
Expand All @@ -20,6 +23,12 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte
*/
protected $flowSettings;

/**
* @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates")
* @var VariableFrontend
*/
protected $catchUpStatesCache;

public function __construct(
private readonly ContentRepositoryId $contentRepositoryId
) {
Expand All @@ -29,15 +38,80 @@ public function triggerCatchUp(Projections $projections): void
{
// modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103
// and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php
foreach ($projections as $projection) {
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => get_class($projection)
]
);
$queuedProjections = array_map(fn($projection) => $this->startCatchUpWithQueueing($projection), iterator_to_array($projections->getIterator()));

Check failure on line 41 in Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

The internal method "Neos\ContentRepository\Core\Projection\Projections::getIterator" is called.
$queuedProjections = array_filter($queuedProjections);

$attempts = 0;
while (!empty($queuedProjections)) {
usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms
if (++$attempts > 50) {
throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279);
}

foreach ($queuedProjections as $key => $projection) {
if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) {
// another process has started a catchUp while we waited, our queue has become irrelevant
unset($queuedProjections[$key]);
}
$hasStarted = $this->startCatchUp($projection);
if ($hasStarted) {
unset($queuedProjections[$key]);
$this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED');
}
}
$queuedProjections = array_values($queuedProjections);
}
}

/**
* @param ProjectionInterface $projection
* @return bool has catchUp been started for given projection
* @throws \Neos\Cache\Exception
*/
private function startCatchUp(ProjectionInterface $projection): bool

Check failure on line 71 in Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

Method Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger::startCatchUp() has parameter $projection with generic interface Neos\ContentRepository\Core\Projection\ProjectionInterface but does not specify its types: TState
{
if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) {
return false;
}

$this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1);
// We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it.
$this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED');
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => get_class($projection)
]
);

return true;
}

/**
* @param ProjectionInterface $projection
* @return ProjectionInterface|null Returns only projections that have been queued for later retry.
* @throws \Neos\Cache\Exception
*/
private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface

Check failure on line 97 in Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

Method Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger::startCatchUpWithQueueing() has parameter $projection with generic interface Neos\ContentRepository\Core\Projection\ProjectionInterface but does not specify its types: TState

Check failure on line 97 in Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

Method Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger::startCatchUpWithQueueing() return type with generic interface Neos\ContentRepository\Core\Projection\ProjectionInterface does not specify its types: TState
{
$catchUpStarted = $this->startCatchUp($projection);
if ($catchUpStarted) {
return null;
}

if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) {
$this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1);
return $projection;
}

return null;
}

private function cacheKeyPrefix(ProjectionInterface $projection): string

Check failure on line 112 in Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

Method Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger::cacheKeyPrefix() has parameter $projection with generic interface Neos\ContentRepository\Core\Projection\ProjectionInterface but does not specify its types: TState
{
$projectionClassName = get_class($projection);
return md5($this->contentRepositoryId->value . $projectionClassName);
}
}
6 changes: 6 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Caches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents:
backend: Neos\Cache\Backend\FileBackend
backendOptions:
defaultLifetime: 400

Neos_ContentRepositoryRegistry_CatchUpStates:
frontend: Neos\Cache\Frontend\VariableFrontend
backend: Neos\Cache\Backend\FileBackend
backendOptions:
defaultLifetime: 30
9 changes: 9 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Objects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,12 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection
value: Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjectionFactory
2:
object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface'


'Neos.ContentRepositoryRegistry:CacheCatchUpStates':
className: Neos\Cache\Frontend\VariableFrontend
factoryObjectName: Neos\Flow\Cache\CacheManager
factoryMethodName: getCache
arguments:
1:
value: Neos_ContentRepositoryRegistry_CatchUpStates

0 comments on commit 8259b2e

Please sign in to comment.