diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index f0221f5f79d..feecf6671cd 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -36,6 +36,7 @@ use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\CatchUp\CatchUp; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\EventMetadata; @@ -79,6 +80,7 @@ public function __construct( private readonly ContentDimensionSourceInterface $contentDimensionSource, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, ) { } @@ -191,6 +193,7 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o } $catchUp->run($eventStream); $catchUpHook?->onAfterCatchUp(); + $this->catchUpDeduplicationQueue->releaseCatchUpLock($projectionClassName); } public function setUp(): SetupResult diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 42e17b5e591..1faa1ba4d47 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -6,16 +6,15 @@ use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\CommandHandler\PendingProjections; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Event; use Neos\EventStore\Model\Event\EventId; use Neos\EventStore\Model\Event\EventMetadata; use Neos\EventStore\Model\Events; -use Neos\EventStore\Model\EventStore\CommitResult; /** * Internal service to persist {@see EventInterface} with the proper normalization, and triggering the @@ -27,7 +26,7 @@ final class EventPersister { public function __construct( private readonly EventStoreInterface $eventStore, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly EventNormalizer $eventNormalizer, private readonly Projections $projections, ) { @@ -67,7 +66,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult $projection->markStale(); } } - $this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections); + $this->catchUpDeduplicationQueue->requestCatchUp($pendingProjections->projections); // The CommandResult can be used to block until projections are up to date. return new CommandResult($pendingProjections, $commitResult); diff --git a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php index ddb45de2859..1061b36fe89 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php @@ -28,9 +28,9 @@ use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler; use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Psr\Clock\ClockInterface; use Symfony\Component\Serializer\Serializer; @@ -52,7 +52,7 @@ public function __construct( ContentDimensionSourceInterface $contentDimensionSource, Serializer $propertySerializer, ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, ) { @@ -100,6 +100,7 @@ public function getOrBuild(): ContentRepository $this->projectionFactoryDependencies->contentDimensionSource, $this->userIdProvider, $this->clock, + $this->catchUpDeduplicationQueue ); } return $this->contentRepository; @@ -164,7 +165,7 @@ private function buildEventPersister(): EventPersister if (!$this->eventPersister) { $this->eventPersister = new EventPersister( $this->projectionFactoryDependencies->eventStore, - $this->projectionCatchUpTrigger, + $this->catchUpDeduplicationQueue, $this->projectionFactoryDependencies->eventNormalizer, $this->projectionsAndCatchUpHooks->projections, ); diff --git a/Neos.ContentRepository.Core/Classes/Projection/Projections.php b/Neos.ContentRepository.Core/Classes/Projection/Projections.php index e9910356908..2ecf86e7e61 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/Projections.php +++ b/Neos.ContentRepository.Core/Classes/Projection/Projections.php @@ -93,4 +93,8 @@ public function getIterator(): \Traversable { yield from $this->projections; } + public function isEmpty(): bool + { + return count($this->projections) === 0; + } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index 5b932dccbbc..07043b26e8b 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -10,7 +10,6 @@ use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; -use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; @@ -26,13 +25,6 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ protected ContentRepositoryRegistry $contentRepositoryRegistry; - /** - * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") - * @var VariableFrontend - */ - protected VariableFrontend $catchUpStatesCache; - - /** * @param string $contentRepositoryIdentifier * @param class-string> $projectionClassName fully qualified class name of the projection to catch up @@ -41,9 +33,7 @@ class SubprocessProjectionCatchUpCommandController extends CommandController public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void { $contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier); - $runnerState = AsynchronousCatchUpRunnerState::create($contentRepositoryId, $projectionClassName, $this->catchUpStatesCache); $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); - $runnerState->setStopped(); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index ef9882db035..433c502ee5e 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -3,6 +3,7 @@ namespace Neos\ContentRepositoryRegistry; +use Neos\Cache\Frontend\FrontendInterface; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface; use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory; @@ -14,7 +15,6 @@ use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\Node; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException; @@ -25,6 +25,7 @@ use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Neos\Flow\Annotations as Flow; use Neos\Flow\ObjectManagement\ObjectManagerInterface; @@ -139,7 +140,7 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content $this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings), $this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings), $this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings), - $this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings), + $this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings), $this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings), $clock, ); @@ -228,14 +229,25 @@ private function buildProjectionsFactory(ContentRepositoryId $contentRepositoryI } /** @param array $contentRepositorySettings */ - private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface + private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): CatchUpDeduplicationQueue { isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value); $projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']); if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) { throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory)); } - return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); + $projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); + + $catchUpStateCache = $this->objectManager->get('Neos.ContentRepositoryRegistry:CacheCatchUpStates'); + if (!$catchUpStateCache instanceof FrontendInterface) { + throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:CacheCatchUpStates" must provide a Cache Frontend, but is "%s".', get_debug_type($catchUpStateCache)); + } + + return new CatchUpDeduplicationQueue( + $contentRepositoryId, + $catchUpStateCache, + $projectionCatchUpTrigger + ); } /** @param array $contentRepositorySettings */ diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index bde10f6e808..b109aaf54d4 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -2,10 +2,6 @@ declare(strict_types=1); namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger; -use Neos\Cache\Frontend\VariableFrontend; -use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; -use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; @@ -24,12 +20,6 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte */ protected $flowSettings; - /** - * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") - * @var VariableFrontend - */ - protected $catchUpStatesCache; - public function __construct( private readonly ContentRepositoryId $contentRepositoryId ) { @@ -39,71 +29,15 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - $queuedProjections = []; foreach ($projections as $projection) { - $runnerState = AsynchronousCatchUpRunnerState::create($this->contentRepositoryId, $projection::class, $this->catchUpStatesCache); - if (!$runnerState->isRunning()) { - $this->startCatchUp($projection, $runnerState); - continue; - } - - if (!$runnerState->isQueued()) { - $runnerState->queue(); - $queuedProjections[] = [$projection, $runnerState]; - } - } - - for ($attempts = 0; $attempts < 50 && !empty($queuedProjections); $attempts++) { - // Incremental back off with some randomness to get a wide spread between processes. - usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms - $queuedProjections = $this->recheckQueuedProjections($queuedProjections); - } - } - - /** - * @param array, AsynchronousCatchUpRunnerState}> $queuedProjections - * @return array, AsynchronousCatchUpRunnerState}> - */ - private function recheckQueuedProjections(array $queuedProjections): array - { - $nextQueuedProjections = []; - /** - * @var ProjectionInterface $projection - * @var AsynchronousCatchUpRunnerState $runnerState - */ - foreach ($queuedProjections as [$projection, $runnerState]) { - // another process has started a catchUp and cleared the queue while we waited, our queue has become irrelevant - if ($runnerState->isQueued() === false) { - continue; - } - - if ($runnerState->isRunning() === false) { - $this->startCatchUp($projection, $runnerState); - } - - $nextQueuedProjections[] = [$projection, $runnerState]; + Scripts::executeCommandAsync( + 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', + $this->flowSettings, + [ + 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, + 'projectionClassName' => get_class($projection) + ] + ); } - - return $nextQueuedProjections; - } - - /** - * @param ProjectionInterface $projection - * @param AsynchronousCatchUpRunnerState $runnerState - * @return void - */ - private function startCatchUp(ProjectionInterface $projection, AsynchronousCatchUpRunnerState $runnerState): void - { - $runnerState->run(); - // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. - $runnerState->dequeue(); - Scripts::executeCommandAsync( - 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', - $this->flowSettings, - [ - 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => $projection::class - ] - ); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php b/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php deleted file mode 100644 index 38d6fdbf8e0..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php +++ /dev/null @@ -1,60 +0,0 @@ -value . '_' . md5($projectionClassName); - $this->cacheKeyRunning = $cacheKeyPrefix . '_RUNNING'; - $this->cacheKeyQueued = $cacheKeyPrefix . '_QUEUED'; - } - - public static function create(ContentRepositoryId $contentRepositoryId, string $projectionClassName, FrontendInterface $cache): self - { - return new self($contentRepositoryId, $projectionClassName, $cache); - } - - public function isRunning(): bool - { - return $this->cache->has($this->cacheKeyRunning); - } - - public function run(): void - { - $this->cache->set($this->cacheKeyRunning, 1); - } - - public function setStopped(): void - { - $this->cache->remove($this->cacheKeyRunning); - } - - public function isQueued(): bool - { - return $this->cache->has($this->cacheKeyQueued); - } - - public function queue(): void - { - $this->cache->set($this->cacheKeyQueued, 1); - } - - public function dequeue(): void - { - $this->cache->remove($this->cacheKeyQueued); - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php new file mode 100644 index 00000000000..e17fb162bf8 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -0,0 +1,171 @@ +triggerCatchUpAndReturnQueued($projections); + $attempts = 0; + /** @phpstan-ignore-next-line */ + while ($queuedProjections->isEmpty() === false) { + usleep(random_int(100, 100 + $attempts) + ($attempts * $attempts * 10)); + $queuedProjections = $this->retryQueued($queuedProjections); + $attempts++; + } + } + + /** + * @param class-string $projectionClassName + * @return void + */ + public function releaseCatchUpLock(string $projectionClassName): void + { + $this->setStopped($projectionClassName); + } + + private function triggerCatchUpAndReturnQueued(Projections $projections): Projections + { + $passToCatchUp = []; + $queuedProjections = []; + foreach ($projections as $projection) { + if (!$this->isRunning($projection::class)) { + $this->run($projection::class); + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $this->dequeue($projection::class); + $passToCatchUp[] = $projection; + continue; + } + + if (!$this->isQueued($projection::class)) { + $this->queue($projection::class); + $queuedProjections[] = $projection; + } + } + + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp)); + + return Projections::fromArray($queuedProjections); + } + + private function retryQueued(Projections $queuedProjections): Projections + { + $passToCatchUp = []; + $stillQueuedProjections = []; + foreach ($queuedProjections as $projection) { + if (!$this->isQueued($projection::class)) { + // was dequeued, we can drop it + continue; + } + + if (!$this->isRunning($projection::class)) { + $this->run($projection::class); + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $this->dequeue($projection::class); + $passToCatchUp[] = $projection; + continue; + } + + $stillQueuedProjections[] = $projection; + } + + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp)); + + return Projections::fromArray($stillQueuedProjections); + } + + /** + * @param class-string $projectionClassName + * @return bool + */ + private function isRunning(string $projectionClassName): bool + { + return $this->catchUpLock->has($this->cacheKeyRunning($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function run(string $projectionClassName): void + { + $this->catchUpLock->set($this->cacheKeyRunning($projectionClassName), 1); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function setStopped(string $projectionClassName): void + { + $this->catchUpLock->remove($this->cacheKeyRunning($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return bool + */ + private function isQueued(string $projectionClassName): bool + { + return $this->catchUpLock->has($this->cacheKeyQueued($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function queue(string $projectionClassName): void + { + $this->catchUpLock->set($this->cacheKeyQueued($projectionClassName), 1); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function dequeue(string $projectionClassName): void + { + $this->catchUpLock->remove($this->cacheKeyQueued($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return string + */ + private function cacheKeyPrefix(string $projectionClassName): string + { + return $this->contentRepositoryId->value . '_' . md5($projectionClassName); + } + + /** + * @param class-string $projectionClassName + * @return string + */ + private function cacheKeyRunning(string $projectionClassName): string + { + return $this->cacheKeyPrefix($projectionClassName) . '_RUNNING'; + } + + /** + * @param class-string $projectionClassName + * @return string + */ + private function cacheKeyQueued(string $projectionClassName): string + { + return $this->cacheKeyPrefix($projectionClassName) . '_QUEUED'; + } +} diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index c3586f8147d..eb4073d9403 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -34,7 +34,6 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection 2: object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface' - 'Neos.ContentRepositoryRegistry:CacheCatchUpStates': className: Neos\Cache\Frontend\VariableFrontend factoryObjectName: Neos\Flow\Cache\CacheManager