diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index 775b34b570f..5b932dccbbc 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -10,6 +10,7 @@ use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; +use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; @@ -23,13 +24,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController * @Flow\Inject * @var ContentRepositoryRegistry */ - protected $contentRepositoryRegistry; + protected ContentRepositoryRegistry $contentRepositoryRegistry; /** * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") * @var VariableFrontend */ - protected $catchUpStatesCache; + protected VariableFrontend $catchUpStatesCache; /** @@ -39,8 +40,10 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void { - $contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier)); + $contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier); + $runnerState = AsynchronousCatchUpRunnerState::create($contentRepositoryId, $projectionClassName, $this->catchUpStatesCache); + $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); - $this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING'); + $runnerState->setStopped(); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index 01c7c8fca47..bde10f6e808 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -4,7 +4,8 @@ use Neos\Cache\Frontend\VariableFrontend; use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\EventStore\Model\Event\SequenceNumber; +use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; +use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; @@ -38,80 +39,71 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - $queuedProjections = array_map($this->startCatchUpWithQueueing(...), iterator_to_array($projections)); - $queuedProjections = array_filter($queuedProjections); - - $attempts = 0; - while (!empty($queuedProjections)) { - usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms - if (++$attempts > 50) { - throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279); + $queuedProjections = []; + foreach ($projections as $projection) { + $runnerState = AsynchronousCatchUpRunnerState::create($this->contentRepositoryId, $projection::class, $this->catchUpStatesCache); + if (!$runnerState->isRunning()) { + $this->startCatchUp($projection, $runnerState); + continue; } - foreach ($queuedProjections as $key => $projection) { - if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) { - // another process has started a catchUp while we waited, our queue has become irrelevant - unset($queuedProjections[$key]); - } - $hasStarted = $this->startCatchUp($projection); - if ($hasStarted) { - unset($queuedProjections[$key]); - $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); - } + if (!$runnerState->isQueued()) { + $runnerState->queue(); + $queuedProjections[] = [$projection, $runnerState]; } - $queuedProjections = array_values($queuedProjections); + } + + for ($attempts = 0; $attempts < 50 && !empty($queuedProjections); $attempts++) { + // Incremental back off with some randomness to get a wide spread between processes. + usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms + $queuedProjections = $this->recheckQueuedProjections($queuedProjections); } } /** - * @param ProjectionInterface $projection - * @return bool has catchUp been started for given projection - * @throws \Neos\Cache\Exception + * @param array, AsynchronousCatchUpRunnerState}> $queuedProjections + * @return array, AsynchronousCatchUpRunnerState}> */ - private function startCatchUp(ProjectionInterface $projection): bool + private function recheckQueuedProjections(array $queuedProjections): array { - if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) { - return false; + $nextQueuedProjections = []; + /** + * @var ProjectionInterface $projection + * @var AsynchronousCatchUpRunnerState $runnerState + */ + foreach ($queuedProjections as [$projection, $runnerState]) { + // another process has started a catchUp and cleared the queue while we waited, our queue has become irrelevant + if ($runnerState->isQueued() === false) { + continue; + } + + if ($runnerState->isRunning() === false) { + $this->startCatchUp($projection, $runnerState); + } + + $nextQueuedProjections[] = [$projection, $runnerState]; } - $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1); - // We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it. - $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); + return $nextQueuedProjections; + } + + /** + * @param ProjectionInterface $projection + * @param AsynchronousCatchUpRunnerState $runnerState + * @return void + */ + private function startCatchUp(ProjectionInterface $projection, AsynchronousCatchUpRunnerState $runnerState): void + { + $runnerState->run(); + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $runnerState->dequeue(); Scripts::executeCommandAsync( 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', $this->flowSettings, [ 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => get_class($projection) + 'projectionClassName' => $projection::class ] ); - - return true; - } - - /** - * @param ProjectionInterface $projection - * @return ProjectionInterface|null Returns only projections that have been queued for later retry. - * @throws \Neos\Cache\Exception - */ - private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface - { - $catchUpStarted = $this->startCatchUp($projection); - if ($catchUpStarted) { - return null; - } - - if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) { - $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1); - return $projection; - } - - return null; - } - - private function cacheKeyPrefix(ProjectionInterface $projection): string - { - $projectionClassName = get_class($projection); - return md5($this->contentRepositoryId->value . $projectionClassName); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php b/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php new file mode 100644 index 00000000000..38d6fdbf8e0 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php @@ -0,0 +1,60 @@ +value . '_' . md5($projectionClassName); + $this->cacheKeyRunning = $cacheKeyPrefix . '_RUNNING'; + $this->cacheKeyQueued = $cacheKeyPrefix . '_QUEUED'; + } + + public static function create(ContentRepositoryId $contentRepositoryId, string $projectionClassName, FrontendInterface $cache): self + { + return new self($contentRepositoryId, $projectionClassName, $cache); + } + + public function isRunning(): bool + { + return $this->cache->has($this->cacheKeyRunning); + } + + public function run(): void + { + $this->cache->set($this->cacheKeyRunning, 1); + } + + public function setStopped(): void + { + $this->cache->remove($this->cacheKeyRunning); + } + + public function isQueued(): bool + { + return $this->cache->has($this->cacheKeyQueued); + } + + public function queue(): void + { + $this->cache->set($this->cacheKeyQueued, 1); + } + + public function dequeue(): void + { + $this->cache->remove($this->cacheKeyQueued); + } +}