From 8259b2e9cd5d3e35af5640c262b5ea027f6300f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Tue, 14 Nov 2023 22:20:22 +0100 Subject: [PATCH] TASK: Prevent multiple catchup runs Will use caches to try avoiding to even start async catchups if for that projection one is still running. Also adds a simple single slot queue with incremental backup to ensure a requested catchup definitely happens. --- ...cessProjectionCatchUpCommandController.php | 9 ++ .../SubprocessProjectionCatchUpTrigger.php | 92 +++++++++++++++++-- .../Configuration/Caches.yaml | 6 ++ .../Configuration/Objects.yaml | 9 ++ 4 files changed, 107 insertions(+), 9 deletions(-) diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index d30d9eca6a6..775b34b570f 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -3,6 +3,7 @@ namespace Neos\ContentRepositoryRegistry\Command; +use Neos\Cache\Frontend\VariableFrontend; use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; @@ -24,6 +25,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ protected $contentRepositoryRegistry; + /** + * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") + * @var VariableFrontend + */ + protected $catchUpStatesCache; + + /** * @param string $contentRepositoryIdentifier * @param class-string> $projectionClassName fully qualified class name of the projection to catch up @@ -33,5 +41,6 @@ public function catchupCommand(string $contentRepositoryIdentifier, string $proj { $contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier)); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); + $this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING'); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index b109aaf54d4..078e2c83685 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -2,6 +2,9 @@ declare(strict_types=1); namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger; +use Neos\Cache\Frontend\VariableFrontend; +use Neos\ContentRepository\Core\Projection\ProjectionInterface; +use Neos\EventStore\Model\Event\SequenceNumber; use Neos\Flow\Annotations as Flow; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; @@ -20,6 +23,12 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte */ protected $flowSettings; + /** + * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") + * @var VariableFrontend + */ + protected $catchUpStatesCache; + public function __construct( private readonly ContentRepositoryId $contentRepositoryId ) { @@ -29,15 +38,80 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - foreach ($projections as $projection) { - Scripts::executeCommandAsync( - 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', - $this->flowSettings, - [ - 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => get_class($projection) - ] - ); + $queuedProjections = array_map(fn($projection) => $this->startCatchUpWithQueueing($projection), iterator_to_array($projections->getIterator())); + $queuedProjections = array_filter($queuedProjections); + + $attempts = 0; + while (!empty($queuedProjections)) { + usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms + if (++$attempts > 50) { + throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279); + } + + foreach ($queuedProjections as $key => $projection) { + if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) { + // another process has started a catchUp while we waited, our queue has become irrelevant + unset($queuedProjections[$key]); + } + $hasStarted = $this->startCatchUp($projection); + if ($hasStarted) { + unset($queuedProjections[$key]); + $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); + } + } + $queuedProjections = array_values($queuedProjections); } } + + /** + * @param ProjectionInterface $projection + * @return bool has catchUp been started for given projection + * @throws \Neos\Cache\Exception + */ + private function startCatchUp(ProjectionInterface $projection): bool + { + if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) { + return false; + } + + $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1); + // We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it. + $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); + Scripts::executeCommandAsync( + 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', + $this->flowSettings, + [ + 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, + 'projectionClassName' => get_class($projection) + ] + ); + + return true; + } + + /** + * @param ProjectionInterface $projection + * @return ProjectionInterface|null Returns only projections that have been queued for later retry. + * @throws \Neos\Cache\Exception + */ + private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface + { + $catchUpStarted = $this->startCatchUp($projection); + if ($catchUpStarted) { + return null; + } + + if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) { + $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1); + return $projection; + } + + return null; + } + + private function cacheKeyPrefix(ProjectionInterface $projection): string + { + $projectionClassName = get_class($projection); + return md5($this->contentRepositoryId->value . $projectionClassName); + } } diff --git a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml index 19b4e1c3b8f..faf517748cc 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml @@ -3,3 +3,9 @@ Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents: backend: Neos\Cache\Backend\FileBackend backendOptions: defaultLifetime: 400 + +Neos_ContentRepositoryRegistry_CatchUpStates: + frontend: Neos\Cache\Frontend\VariableFrontend + backend: Neos\Cache\Backend\FileBackend + backendOptions: + defaultLifetime: 30 diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index 564750f56d8..c3586f8147d 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -33,3 +33,12 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection value: Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjectionFactory 2: object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface' + + +'Neos.ContentRepositoryRegistry:CacheCatchUpStates': + className: Neos\Cache\Frontend\VariableFrontend + factoryObjectName: Neos\Flow\Cache\CacheManager + factoryMethodName: getCache + arguments: + 1: + value: Neos_ContentRepositoryRegistry_CatchUpStates