From f52910fed3449927b00ce13dc2630136e737a869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Tue, 14 Nov 2023 22:20:22 +0100 Subject: [PATCH 1/9] TASK: Prevent multiple catchup runs Will use caches to try avoiding to even start async catchups if for that projection one is still running. Also adds a simple single slot queue with incremental backup to ensure a requested catchup definitely happens. --- ...cessProjectionCatchUpCommandController.php | 9 ++ .../SubprocessProjectionCatchUpTrigger.php | 92 +++++++++++++++++-- .../Configuration/Caches.yaml | 6 ++ .../Configuration/Objects.yaml | 9 ++ 4 files changed, 107 insertions(+), 9 deletions(-) diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index d30d9eca6a6..775b34b570f 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -3,6 +3,7 @@ namespace Neos\ContentRepositoryRegistry\Command; +use Neos\Cache\Frontend\VariableFrontend; use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; @@ -24,6 +25,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ protected $contentRepositoryRegistry; + /** + * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") + * @var VariableFrontend + */ + protected $catchUpStatesCache; + + /** * @param string $contentRepositoryIdentifier * @param class-string> $projectionClassName fully qualified class name of the projection to catch up @@ -33,5 +41,6 @@ public function catchupCommand(string $contentRepositoryIdentifier, string $proj { $contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier)); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); + $this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING'); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index b109aaf54d4..078e2c83685 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -2,6 +2,9 @@ declare(strict_types=1); namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger; +use Neos\Cache\Frontend\VariableFrontend; +use Neos\ContentRepository\Core\Projection\ProjectionInterface; +use Neos\EventStore\Model\Event\SequenceNumber; use Neos\Flow\Annotations as Flow; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; @@ -20,6 +23,12 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte */ protected $flowSettings; + /** + * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") + * @var VariableFrontend + */ + protected $catchUpStatesCache; + public function __construct( private readonly ContentRepositoryId $contentRepositoryId ) { @@ -29,15 +38,80 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - foreach ($projections as $projection) { - Scripts::executeCommandAsync( - 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', - $this->flowSettings, - [ - 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => get_class($projection) - ] - ); + $queuedProjections = array_map(fn($projection) => $this->startCatchUpWithQueueing($projection), iterator_to_array($projections->getIterator())); + $queuedProjections = array_filter($queuedProjections); + + $attempts = 0; + while (!empty($queuedProjections)) { + usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms + if (++$attempts > 50) { + throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279); + } + + foreach ($queuedProjections as $key => $projection) { + if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) { + // another process has started a catchUp while we waited, our queue has become irrelevant + unset($queuedProjections[$key]); + } + $hasStarted = $this->startCatchUp($projection); + if ($hasStarted) { + unset($queuedProjections[$key]); + $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); + } + } + $queuedProjections = array_values($queuedProjections); } } + + /** + * @param ProjectionInterface $projection + * @return bool has catchUp been started for given projection + * @throws \Neos\Cache\Exception + */ + private function startCatchUp(ProjectionInterface $projection): bool + { + if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) { + return false; + } + + $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1); + // We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it. + $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); + Scripts::executeCommandAsync( + 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', + $this->flowSettings, + [ + 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, + 'projectionClassName' => get_class($projection) + ] + ); + + return true; + } + + /** + * @param ProjectionInterface $projection + * @return ProjectionInterface|null Returns only projections that have been queued for later retry. + * @throws \Neos\Cache\Exception + */ + private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface + { + $catchUpStarted = $this->startCatchUp($projection); + if ($catchUpStarted) { + return null; + } + + if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) { + $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1); + return $projection; + } + + return null; + } + + private function cacheKeyPrefix(ProjectionInterface $projection): string + { + $projectionClassName = get_class($projection); + return md5($this->contentRepositoryId->value . $projectionClassName); + } } diff --git a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml index 19b4e1c3b8f..faf517748cc 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml @@ -3,3 +3,9 @@ Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents: backend: Neos\Cache\Backend\FileBackend backendOptions: defaultLifetime: 400 + +Neos_ContentRepositoryRegistry_CatchUpStates: + frontend: Neos\Cache\Frontend\VariableFrontend + backend: Neos\Cache\Backend\FileBackend + backendOptions: + defaultLifetime: 30 diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index 564750f56d8..c3586f8147d 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -33,3 +33,12 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection value: Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjectionFactory 2: object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface' + + +'Neos.ContentRepositoryRegistry:CacheCatchUpStates': + className: Neos\Cache\Frontend\VariableFrontend + factoryObjectName: Neos\Flow\Cache\CacheManager + factoryMethodName: getCache + arguments: + 1: + value: Neos_ContentRepositoryRegistry_CatchUpStates From 5843e5eea4bfd82030991de4507a39d4d1830db4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20M=C3=BCller?= Date: Wed, 15 Nov 2023 10:08:36 +0100 Subject: [PATCH 2/9] Update Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php Co-authored-by: Bastian Waidelich --- .../SubprocessProjectionCatchUpTrigger.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index 078e2c83685..01c7c8fca47 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -38,7 +38,7 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - $queuedProjections = array_map(fn($projection) => $this->startCatchUpWithQueueing($projection), iterator_to_array($projections->getIterator())); + $queuedProjections = array_map($this->startCatchUpWithQueueing(...), iterator_to_array($projections)); $queuedProjections = array_filter($queuedProjections); $attempts = 0; From 1a7990ec3f93cfa92d206275ee0003bc33a733ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Wed, 15 Nov 2023 18:34:16 +0100 Subject: [PATCH 3/9] TASK: Code cleanup and refactoring --- ...cessProjectionCatchUpCommandController.php | 11 +- .../SubprocessProjectionCatchUpTrigger.php | 108 ++++++++---------- .../AsynchronousCatchUpRunnerState.php | 60 ++++++++++ 3 files changed, 117 insertions(+), 62 deletions(-) create mode 100644 Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index 775b34b570f..5b932dccbbc 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -10,6 +10,7 @@ use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; +use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; @@ -23,13 +24,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController * @Flow\Inject * @var ContentRepositoryRegistry */ - protected $contentRepositoryRegistry; + protected ContentRepositoryRegistry $contentRepositoryRegistry; /** * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") * @var VariableFrontend */ - protected $catchUpStatesCache; + protected VariableFrontend $catchUpStatesCache; /** @@ -39,8 +40,10 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void { - $contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier)); + $contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier); + $runnerState = AsynchronousCatchUpRunnerState::create($contentRepositoryId, $projectionClassName, $this->catchUpStatesCache); + $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); - $this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING'); + $runnerState->setStopped(); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index 01c7c8fca47..bde10f6e808 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -4,7 +4,8 @@ use Neos\Cache\Frontend\VariableFrontend; use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\EventStore\Model\Event\SequenceNumber; +use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; +use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; @@ -38,80 +39,71 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - $queuedProjections = array_map($this->startCatchUpWithQueueing(...), iterator_to_array($projections)); - $queuedProjections = array_filter($queuedProjections); - - $attempts = 0; - while (!empty($queuedProjections)) { - usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms - if (++$attempts > 50) { - throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279); + $queuedProjections = []; + foreach ($projections as $projection) { + $runnerState = AsynchronousCatchUpRunnerState::create($this->contentRepositoryId, $projection::class, $this->catchUpStatesCache); + if (!$runnerState->isRunning()) { + $this->startCatchUp($projection, $runnerState); + continue; } - foreach ($queuedProjections as $key => $projection) { - if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) { - // another process has started a catchUp while we waited, our queue has become irrelevant - unset($queuedProjections[$key]); - } - $hasStarted = $this->startCatchUp($projection); - if ($hasStarted) { - unset($queuedProjections[$key]); - $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); - } + if (!$runnerState->isQueued()) { + $runnerState->queue(); + $queuedProjections[] = [$projection, $runnerState]; } - $queuedProjections = array_values($queuedProjections); + } + + for ($attempts = 0; $attempts < 50 && !empty($queuedProjections); $attempts++) { + // Incremental back off with some randomness to get a wide spread between processes. + usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms + $queuedProjections = $this->recheckQueuedProjections($queuedProjections); } } /** - * @param ProjectionInterface $projection - * @return bool has catchUp been started for given projection - * @throws \Neos\Cache\Exception + * @param array, AsynchronousCatchUpRunnerState}> $queuedProjections + * @return array, AsynchronousCatchUpRunnerState}> */ - private function startCatchUp(ProjectionInterface $projection): bool + private function recheckQueuedProjections(array $queuedProjections): array { - if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) { - return false; + $nextQueuedProjections = []; + /** + * @var ProjectionInterface $projection + * @var AsynchronousCatchUpRunnerState $runnerState + */ + foreach ($queuedProjections as [$projection, $runnerState]) { + // another process has started a catchUp and cleared the queue while we waited, our queue has become irrelevant + if ($runnerState->isQueued() === false) { + continue; + } + + if ($runnerState->isRunning() === false) { + $this->startCatchUp($projection, $runnerState); + } + + $nextQueuedProjections[] = [$projection, $runnerState]; } - $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1); - // We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it. - $this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED'); + return $nextQueuedProjections; + } + + /** + * @param ProjectionInterface $projection + * @param AsynchronousCatchUpRunnerState $runnerState + * @return void + */ + private function startCatchUp(ProjectionInterface $projection, AsynchronousCatchUpRunnerState $runnerState): void + { + $runnerState->run(); + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $runnerState->dequeue(); Scripts::executeCommandAsync( 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', $this->flowSettings, [ 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => get_class($projection) + 'projectionClassName' => $projection::class ] ); - - return true; - } - - /** - * @param ProjectionInterface $projection - * @return ProjectionInterface|null Returns only projections that have been queued for later retry. - * @throws \Neos\Cache\Exception - */ - private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface - { - $catchUpStarted = $this->startCatchUp($projection); - if ($catchUpStarted) { - return null; - } - - if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) { - $this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1); - return $projection; - } - - return null; - } - - private function cacheKeyPrefix(ProjectionInterface $projection): string - { - $projectionClassName = get_class($projection); - return md5($this->contentRepositoryId->value . $projectionClassName); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php b/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php new file mode 100644 index 00000000000..38d6fdbf8e0 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php @@ -0,0 +1,60 @@ +value . '_' . md5($projectionClassName); + $this->cacheKeyRunning = $cacheKeyPrefix . '_RUNNING'; + $this->cacheKeyQueued = $cacheKeyPrefix . '_QUEUED'; + } + + public static function create(ContentRepositoryId $contentRepositoryId, string $projectionClassName, FrontendInterface $cache): self + { + return new self($contentRepositoryId, $projectionClassName, $cache); + } + + public function isRunning(): bool + { + return $this->cache->has($this->cacheKeyRunning); + } + + public function run(): void + { + $this->cache->set($this->cacheKeyRunning, 1); + } + + public function setStopped(): void + { + $this->cache->remove($this->cacheKeyRunning); + } + + public function isQueued(): bool + { + return $this->cache->has($this->cacheKeyQueued); + } + + public function queue(): void + { + $this->cache->set($this->cacheKeyQueued, 1); + } + + public function dequeue(): void + { + $this->cache->remove($this->cacheKeyQueued); + } +} From 387f409f6ca6e90dcd61a1ae1acd11d7d540a429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Tue, 28 Nov 2023 22:13:26 +0100 Subject: [PATCH 4/9] Rewrite to central deduplication queue --- .../Classes/ContentRepository.php | 3 + .../Classes/EventStore/EventPersister.php | 7 +- .../Factory/ContentRepositoryFactory.php | 7 +- .../Classes/Projection/Projections.php | 4 + ...cessProjectionCatchUpCommandController.php | 10 - .../Classes/ContentRepositoryRegistry.php | 20 +- .../SubprocessProjectionCatchUpTrigger.php | 82 +-------- .../AsynchronousCatchUpRunnerState.php | 60 ------ .../Service/CatchUpDeduplicationQueue.php | 171 ++++++++++++++++++ .../Configuration/Objects.yaml | 1 - 10 files changed, 209 insertions(+), 156 deletions(-) delete mode 100644 Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php create mode 100644 Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index f0221f5f79d..feecf6671cd 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -36,6 +36,7 @@ use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\CatchUp\CatchUp; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\EventMetadata; @@ -79,6 +80,7 @@ public function __construct( private readonly ContentDimensionSourceInterface $contentDimensionSource, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, ) { } @@ -191,6 +193,7 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o } $catchUp->run($eventStream); $catchUpHook?->onAfterCatchUp(); + $this->catchUpDeduplicationQueue->releaseCatchUpLock($projectionClassName); } public function setUp(): SetupResult diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 42e17b5e591..1faa1ba4d47 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -6,16 +6,15 @@ use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\CommandHandler\PendingProjections; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Event; use Neos\EventStore\Model\Event\EventId; use Neos\EventStore\Model\Event\EventMetadata; use Neos\EventStore\Model\Events; -use Neos\EventStore\Model\EventStore\CommitResult; /** * Internal service to persist {@see EventInterface} with the proper normalization, and triggering the @@ -27,7 +26,7 @@ final class EventPersister { public function __construct( private readonly EventStoreInterface $eventStore, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly EventNormalizer $eventNormalizer, private readonly Projections $projections, ) { @@ -67,7 +66,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult $projection->markStale(); } } - $this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections); + $this->catchUpDeduplicationQueue->requestCatchUp($pendingProjections->projections); // The CommandResult can be used to block until projections are up to date. return new CommandResult($pendingProjections, $commitResult); diff --git a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php index ddb45de2859..1061b36fe89 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php @@ -28,9 +28,9 @@ use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler; use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Psr\Clock\ClockInterface; use Symfony\Component\Serializer\Serializer; @@ -52,7 +52,7 @@ public function __construct( ContentDimensionSourceInterface $contentDimensionSource, Serializer $propertySerializer, ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, ) { @@ -100,6 +100,7 @@ public function getOrBuild(): ContentRepository $this->projectionFactoryDependencies->contentDimensionSource, $this->userIdProvider, $this->clock, + $this->catchUpDeduplicationQueue ); } return $this->contentRepository; @@ -164,7 +165,7 @@ private function buildEventPersister(): EventPersister if (!$this->eventPersister) { $this->eventPersister = new EventPersister( $this->projectionFactoryDependencies->eventStore, - $this->projectionCatchUpTrigger, + $this->catchUpDeduplicationQueue, $this->projectionFactoryDependencies->eventNormalizer, $this->projectionsAndCatchUpHooks->projections, ); diff --git a/Neos.ContentRepository.Core/Classes/Projection/Projections.php b/Neos.ContentRepository.Core/Classes/Projection/Projections.php index e9910356908..2ecf86e7e61 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/Projections.php +++ b/Neos.ContentRepository.Core/Classes/Projection/Projections.php @@ -93,4 +93,8 @@ public function getIterator(): \Traversable { yield from $this->projections; } + public function isEmpty(): bool + { + return count($this->projections) === 0; + } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index 5b932dccbbc..07043b26e8b 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -10,7 +10,6 @@ use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; -use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; @@ -26,13 +25,6 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ protected ContentRepositoryRegistry $contentRepositoryRegistry; - /** - * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") - * @var VariableFrontend - */ - protected VariableFrontend $catchUpStatesCache; - - /** * @param string $contentRepositoryIdentifier * @param class-string> $projectionClassName fully qualified class name of the projection to catch up @@ -41,9 +33,7 @@ class SubprocessProjectionCatchUpCommandController extends CommandController public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void { $contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier); - $runnerState = AsynchronousCatchUpRunnerState::create($contentRepositoryId, $projectionClassName, $this->catchUpStatesCache); $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); - $runnerState->setStopped(); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index ef9882db035..433c502ee5e 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -3,6 +3,7 @@ namespace Neos\ContentRepositoryRegistry; +use Neos\Cache\Frontend\FrontendInterface; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface; use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory; @@ -14,7 +15,6 @@ use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\Node; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException; @@ -25,6 +25,7 @@ use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Neos\Flow\Annotations as Flow; use Neos\Flow\ObjectManagement\ObjectManagerInterface; @@ -139,7 +140,7 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content $this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings), $this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings), $this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings), - $this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings), + $this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings), $this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings), $clock, ); @@ -228,14 +229,25 @@ private function buildProjectionsFactory(ContentRepositoryId $contentRepositoryI } /** @param array $contentRepositorySettings */ - private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface + private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): CatchUpDeduplicationQueue { isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value); $projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']); if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) { throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory)); } - return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); + $projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); + + $catchUpStateCache = $this->objectManager->get('Neos.ContentRepositoryRegistry:CacheCatchUpStates'); + if (!$catchUpStateCache instanceof FrontendInterface) { + throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:CacheCatchUpStates" must provide a Cache Frontend, but is "%s".', get_debug_type($catchUpStateCache)); + } + + return new CatchUpDeduplicationQueue( + $contentRepositoryId, + $catchUpStateCache, + $projectionCatchUpTrigger + ); } /** @param array $contentRepositorySettings */ diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php index bde10f6e808..b109aaf54d4 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php @@ -2,10 +2,6 @@ declare(strict_types=1); namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger; -use Neos\Cache\Frontend\VariableFrontend; -use Neos\ContentRepository\Core\Projection\ProjectionInterface; -use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; -use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState; use Neos\Flow\Annotations as Flow; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; @@ -24,12 +20,6 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte */ protected $flowSettings; - /** - * @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates") - * @var VariableFrontend - */ - protected $catchUpStatesCache; - public function __construct( private readonly ContentRepositoryId $contentRepositoryId ) { @@ -39,71 +29,15 @@ public function triggerCatchUp(Projections $projections): void { // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - $queuedProjections = []; foreach ($projections as $projection) { - $runnerState = AsynchronousCatchUpRunnerState::create($this->contentRepositoryId, $projection::class, $this->catchUpStatesCache); - if (!$runnerState->isRunning()) { - $this->startCatchUp($projection, $runnerState); - continue; - } - - if (!$runnerState->isQueued()) { - $runnerState->queue(); - $queuedProjections[] = [$projection, $runnerState]; - } - } - - for ($attempts = 0; $attempts < 50 && !empty($queuedProjections); $attempts++) { - // Incremental back off with some randomness to get a wide spread between processes. - usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms - $queuedProjections = $this->recheckQueuedProjections($queuedProjections); - } - } - - /** - * @param array, AsynchronousCatchUpRunnerState}> $queuedProjections - * @return array, AsynchronousCatchUpRunnerState}> - */ - private function recheckQueuedProjections(array $queuedProjections): array - { - $nextQueuedProjections = []; - /** - * @var ProjectionInterface $projection - * @var AsynchronousCatchUpRunnerState $runnerState - */ - foreach ($queuedProjections as [$projection, $runnerState]) { - // another process has started a catchUp and cleared the queue while we waited, our queue has become irrelevant - if ($runnerState->isQueued() === false) { - continue; - } - - if ($runnerState->isRunning() === false) { - $this->startCatchUp($projection, $runnerState); - } - - $nextQueuedProjections[] = [$projection, $runnerState]; + Scripts::executeCommandAsync( + 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', + $this->flowSettings, + [ + 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, + 'projectionClassName' => get_class($projection) + ] + ); } - - return $nextQueuedProjections; - } - - /** - * @param ProjectionInterface $projection - * @param AsynchronousCatchUpRunnerState $runnerState - * @return void - */ - private function startCatchUp(ProjectionInterface $projection, AsynchronousCatchUpRunnerState $runnerState): void - { - $runnerState->run(); - // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. - $runnerState->dequeue(); - Scripts::executeCommandAsync( - 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', - $this->flowSettings, - [ - 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => $projection::class - ] - ); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php b/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php deleted file mode 100644 index 38d6fdbf8e0..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Service/AsynchronousCatchUpRunnerState.php +++ /dev/null @@ -1,60 +0,0 @@ -value . '_' . md5($projectionClassName); - $this->cacheKeyRunning = $cacheKeyPrefix . '_RUNNING'; - $this->cacheKeyQueued = $cacheKeyPrefix . '_QUEUED'; - } - - public static function create(ContentRepositoryId $contentRepositoryId, string $projectionClassName, FrontendInterface $cache): self - { - return new self($contentRepositoryId, $projectionClassName, $cache); - } - - public function isRunning(): bool - { - return $this->cache->has($this->cacheKeyRunning); - } - - public function run(): void - { - $this->cache->set($this->cacheKeyRunning, 1); - } - - public function setStopped(): void - { - $this->cache->remove($this->cacheKeyRunning); - } - - public function isQueued(): bool - { - return $this->cache->has($this->cacheKeyQueued); - } - - public function queue(): void - { - $this->cache->set($this->cacheKeyQueued, 1); - } - - public function dequeue(): void - { - $this->cache->remove($this->cacheKeyQueued); - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php new file mode 100644 index 00000000000..e17fb162bf8 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -0,0 +1,171 @@ +triggerCatchUpAndReturnQueued($projections); + $attempts = 0; + /** @phpstan-ignore-next-line */ + while ($queuedProjections->isEmpty() === false) { + usleep(random_int(100, 100 + $attempts) + ($attempts * $attempts * 10)); + $queuedProjections = $this->retryQueued($queuedProjections); + $attempts++; + } + } + + /** + * @param class-string $projectionClassName + * @return void + */ + public function releaseCatchUpLock(string $projectionClassName): void + { + $this->setStopped($projectionClassName); + } + + private function triggerCatchUpAndReturnQueued(Projections $projections): Projections + { + $passToCatchUp = []; + $queuedProjections = []; + foreach ($projections as $projection) { + if (!$this->isRunning($projection::class)) { + $this->run($projection::class); + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $this->dequeue($projection::class); + $passToCatchUp[] = $projection; + continue; + } + + if (!$this->isQueued($projection::class)) { + $this->queue($projection::class); + $queuedProjections[] = $projection; + } + } + + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp)); + + return Projections::fromArray($queuedProjections); + } + + private function retryQueued(Projections $queuedProjections): Projections + { + $passToCatchUp = []; + $stillQueuedProjections = []; + foreach ($queuedProjections as $projection) { + if (!$this->isQueued($projection::class)) { + // was dequeued, we can drop it + continue; + } + + if (!$this->isRunning($projection::class)) { + $this->run($projection::class); + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $this->dequeue($projection::class); + $passToCatchUp[] = $projection; + continue; + } + + $stillQueuedProjections[] = $projection; + } + + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp)); + + return Projections::fromArray($stillQueuedProjections); + } + + /** + * @param class-string $projectionClassName + * @return bool + */ + private function isRunning(string $projectionClassName): bool + { + return $this->catchUpLock->has($this->cacheKeyRunning($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function run(string $projectionClassName): void + { + $this->catchUpLock->set($this->cacheKeyRunning($projectionClassName), 1); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function setStopped(string $projectionClassName): void + { + $this->catchUpLock->remove($this->cacheKeyRunning($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return bool + */ + private function isQueued(string $projectionClassName): bool + { + return $this->catchUpLock->has($this->cacheKeyQueued($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function queue(string $projectionClassName): void + { + $this->catchUpLock->set($this->cacheKeyQueued($projectionClassName), 1); + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function dequeue(string $projectionClassName): void + { + $this->catchUpLock->remove($this->cacheKeyQueued($projectionClassName)); + } + + /** + * @param class-string $projectionClassName + * @return string + */ + private function cacheKeyPrefix(string $projectionClassName): string + { + return $this->contentRepositoryId->value . '_' . md5($projectionClassName); + } + + /** + * @param class-string $projectionClassName + * @return string + */ + private function cacheKeyRunning(string $projectionClassName): string + { + return $this->cacheKeyPrefix($projectionClassName) . '_RUNNING'; + } + + /** + * @param class-string $projectionClassName + * @return string + */ + private function cacheKeyQueued(string $projectionClassName): string + { + return $this->cacheKeyPrefix($projectionClassName) . '_QUEUED'; + } +} diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index c3586f8147d..eb4073d9403 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -34,7 +34,6 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection 2: object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface' - 'Neos.ContentRepositoryRegistry:CacheCatchUpStates': className: Neos\Cache\Frontend\VariableFrontend factoryObjectName: Neos\Flow\Cache\CacheManager From 1f33d3cfa58ffb6ff6b36b6d8670c2b91fa996f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Wed, 29 Nov 2023 14:29:50 +0100 Subject: [PATCH 5/9] Fix style issues --- .../Classes/Projection/Projections.php | 3 ++- ...cessProjectionCatchUpCommandController.php | 14 +++++-------- .../Service/CatchUpDeduplicationQueue.php | 21 +++++++------------ 3 files changed, 14 insertions(+), 24 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Projection/Projections.php b/Neos.ContentRepository.Core/Classes/Projection/Projections.php index 2ecf86e7e61..a507f0284c9 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/Projections.php +++ b/Neos.ContentRepository.Core/Classes/Projection/Projections.php @@ -93,8 +93,9 @@ public function getIterator(): \Traversable { yield from $this->projections; } + public function isEmpty(): bool { - return count($this->projections) === 0; + return $this->projections === []; } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index 07043b26e8b..064373a5444 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -3,14 +3,12 @@ namespace Neos\ContentRepositoryRegistry\Command; -use Neos\Cache\Frontend\VariableFrontend; use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; -use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; /** @@ -19,11 +17,10 @@ */ class SubprocessProjectionCatchUpCommandController extends CommandController { - /** - * @Flow\Inject - * @var ContentRepositoryRegistry - */ - protected ContentRepositoryRegistry $contentRepositoryRegistry; + public function __construct(private readonly ContentRepositoryRegistry $contentRepositoryRegistry) + { + parent::__construct(); + } /** * @param string $contentRepositoryIdentifier @@ -32,8 +29,7 @@ class SubprocessProjectionCatchUpCommandController extends CommandController */ public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void { - $contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier); - $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); + $contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier)); $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php index e17fb162bf8..7633a6356de 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -7,7 +7,9 @@ use Neos\ContentRepository\Core\Projection\Projections; /** - * + * This encapsulates logic to provide exactly once catchUps + * across multiple processes, leaving the way catchUps are done + * to the ProjectionCatchUpTriggerInterface. */ final readonly class CatchUpDeduplicationQueue { @@ -35,19 +37,19 @@ public function requestCatchUp(Projections $projections): void */ public function releaseCatchUpLock(string $projectionClassName): void { - $this->setStopped($projectionClassName); + $this->catchUpLock->remove($this->cacheKeyRunning($projectionClassName)); } private function triggerCatchUpAndReturnQueued(Projections $projections): Projections { - $passToCatchUp = []; + $projectionsToCatchUp = []; $queuedProjections = []; foreach ($projections as $projection) { if (!$this->isRunning($projection::class)) { $this->run($projection::class); // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. $this->dequeue($projection::class); - $passToCatchUp[] = $projection; + $projectionsToCatchUp[] = $projection; continue; } @@ -57,7 +59,7 @@ private function triggerCatchUpAndReturnQueued(Projections $projections): Projec } } - $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp)); + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($projectionsToCatchUp)); return Projections::fromArray($queuedProjections); } @@ -106,15 +108,6 @@ private function run(string $projectionClassName): void $this->catchUpLock->set($this->cacheKeyRunning($projectionClassName), 1); } - /** - * @param class-string $projectionClassName - * @return void - */ - private function setStopped(string $projectionClassName): void - { - $this->catchUpLock->remove($this->cacheKeyRunning($projectionClassName)); - } - /** * @param class-string $projectionClassName * @return bool From 5e31584c658ceb2cf45b71936759891bd5f3c945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Wed, 29 Nov 2023 15:17:18 +0100 Subject: [PATCH 6/9] Stop waiting for queued catchups after 20 seconds --- .../Classes/Service/CatchUpDeduplicationQueue.php | 11 ++++++++++- .../Configuration/Caches.yaml | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php index 7633a6356de..a2ad2da5e34 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -22,12 +22,21 @@ public function __construct( public function requestCatchUp(Projections $projections): void { $queuedProjections = $this->triggerCatchUpAndReturnQueued($projections); + + /* + * Due to the random nature of the retry delay (see below) + * we define an absolute time limit spend in this loop as + * end condition in case we cannot resolve the queue before. + * */ + $startTime = $currentTime = microtime(true); $attempts = 0; /** @phpstan-ignore-next-line */ - while ($queuedProjections->isEmpty() === false) { + while ($queuedProjections->isEmpty() === false && ($currentTime - $startTime) < 20) { + // incrementally slower retries with some randomness to allow for tie breaks if parallel processes are in this loop. usleep(random_int(100, 100 + $attempts) + ($attempts * $attempts * 10)); $queuedProjections = $this->retryQueued($queuedProjections); $attempts++; + $currentTime = microtime(true); } } diff --git a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml index faf517748cc..168bfdd1775 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml @@ -8,4 +8,4 @@ Neos_ContentRepositoryRegistry_CatchUpStates: frontend: Neos\Cache\Frontend\VariableFrontend backend: Neos\Cache\Backend\FileBackend backendOptions: - defaultLifetime: 30 + defaultLifetime: 20 From 5299fccf852d0de2f93af5945632dffdf5039730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Wed, 6 Dec 2023 09:35:17 +0100 Subject: [PATCH 7/9] Implement CatchUpDeduplication with locks Using symfony/lock ensures atomic locking, which should really prevent duplication even under load. --- .../Classes/ContentRepositoryRegistry.php | 17 ++++- .../Service/CatchUpDeduplicationQueue.php | 73 ++++--------------- .../Configuration/Objects.yaml | 17 +++++ Neos.ContentRepositoryRegistry/composer.json | 3 +- composer.json | 1 + 5 files changed, 49 insertions(+), 62 deletions(-) diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index 433c502ee5e..745b5947e41 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -32,6 +32,9 @@ use Neos\Utility\Arrays; use Neos\Utility\PositionalArraySorter; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\PersistingStoreInterface; +use Symfony\Component\Lock\Store\DoctrineDbalStore; use Symfony\Component\Serializer\Normalizer\DenormalizerInterface; use Symfony\Component\Serializer\Normalizer\NormalizerInterface; use Symfony\Component\Serializer\Serializer; @@ -238,14 +241,20 @@ private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepo } $projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); - $catchUpStateCache = $this->objectManager->get('Neos.ContentRepositoryRegistry:CacheCatchUpStates'); - if (!$catchUpStateCache instanceof FrontendInterface) { - throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:CacheCatchUpStates" must provide a Cache Frontend, but is "%s".', get_debug_type($catchUpStateCache)); + $catchUpStateLockStorage = $this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage'); + if (!$catchUpStateLockStorage instanceof PersistingStoreInterface) { + throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:QueueLockStorage" must provide a \Symfony\Component\Lock\PersistingStoreInterface, but is "%s".', get_debug_type($catchUpStateLockStorage)); + } + if ($catchUpStateLockStorage instanceof DoctrineDbalStore) { + try { + // hack to ensure tables exist for Dbal + $catchUpStateLockStorage->createTable(); + } catch (\Doctrine\DBAL\Exception\TableExistsException $_) {} } return new CatchUpDeduplicationQueue( $contentRepositoryId, - $catchUpStateCache, + new LockFactory($catchUpStateLockStorage), $projectionCatchUpTrigger ); } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php index a2ad2da5e34..7d97f538a68 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -1,10 +1,10 @@ catchUpLock->remove($this->cacheKeyRunning($projectionClassName)); + $runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projectionClassName)); + $runningLock->isAcquired() && $runningLock->release(); } private function triggerCatchUpAndReturnQueued(Projections $projections): Projections @@ -54,16 +55,16 @@ private function triggerCatchUpAndReturnQueued(Projections $projections): Projec $projectionsToCatchUp = []; $queuedProjections = []; foreach ($projections as $projection) { - if (!$this->isRunning($projection::class)) { - $this->run($projection::class); + $runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projection::class)); + $queuedLock = $this->lockFactoy->createLock($this->cacheKeyQueued($projection::class)); + if ($runningLock->acquire()) { // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. - $this->dequeue($projection::class); + $queuedLock->release(); $projectionsToCatchUp[] = $projection; continue; } - if (!$this->isQueued($projection::class)) { - $this->queue($projection::class); + if ($queuedLock->acquire()) { $queuedProjections[] = $projection; } } @@ -77,16 +78,19 @@ private function retryQueued(Projections $queuedProjections): Projections { $passToCatchUp = []; $stillQueuedProjections = []; + foreach ($queuedProjections as $projection) { - if (!$this->isQueued($projection::class)) { + $runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projection::class)); + $queuedLock = $this->lockFactoy->createLock($this->cacheKeyQueued($projection::class)); + + if (!$queuedLock->isAcquired()) { // was dequeued, we can drop it continue; } - if (!$this->isRunning($projection::class)) { - $this->run($projection::class); + if ($runningLock->acquire()) { // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. - $this->dequeue($projection::class); + $queuedLock->release(); $passToCatchUp[] = $projection; continue; } @@ -99,51 +103,6 @@ private function retryQueued(Projections $queuedProjections): Projections return Projections::fromArray($stillQueuedProjections); } - /** - * @param class-string $projectionClassName - * @return bool - */ - private function isRunning(string $projectionClassName): bool - { - return $this->catchUpLock->has($this->cacheKeyRunning($projectionClassName)); - } - - /** - * @param class-string $projectionClassName - * @return void - */ - private function run(string $projectionClassName): void - { - $this->catchUpLock->set($this->cacheKeyRunning($projectionClassName), 1); - } - - /** - * @param class-string $projectionClassName - * @return bool - */ - private function isQueued(string $projectionClassName): bool - { - return $this->catchUpLock->has($this->cacheKeyQueued($projectionClassName)); - } - - /** - * @param class-string $projectionClassName - * @return void - */ - private function queue(string $projectionClassName): void - { - $this->catchUpLock->set($this->cacheKeyQueued($projectionClassName), 1); - } - - /** - * @param class-string $projectionClassName - * @return void - */ - private function dequeue(string $projectionClassName): void - { - $this->catchUpLock->remove($this->cacheKeyQueued($projectionClassName)); - } - /** * @param class-string $projectionClassName * @return string diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index eb4073d9403..9184d28497e 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -41,3 +41,20 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection arguments: 1: value: Neos_ContentRepositoryRegistry_CatchUpStates + +'Neos.ContentRepositoryRegistry:DbalConnection': + className: Doctrine\DBAL\Connection + factoryObjectName: Neos\ContentRepositoryRegistry\DoctrineDbalClient\DoctrineDbalClient + factoryMethodName: getConnection + +'Neos.ContentRepositoryRegistry:QueueLockStorage': + className: Symfony\Component\Lock\Store\DoctrineDbalStore + arguments: + 1: + object: 'Neos.ContentRepositoryRegistry:DbalConnection' + +'Neos.ContentRepositoryRegistry:QueueLockFactory': + className: Symfony\Component\Lock\LockFactory + arguments: + 1: + object: 'Neos.ContentRepositoryRegistry:QueueLockStorage' diff --git a/Neos.ContentRepositoryRegistry/composer.json b/Neos.ContentRepositoryRegistry/composer.json index 9a095a3f034..5bc23165b84 100644 --- a/Neos.ContentRepositoryRegistry/composer.json +++ b/Neos.ContentRepositoryRegistry/composer.json @@ -17,7 +17,8 @@ "neos/contentrepository-core": "self.version", "neos/contentrepositoryregistry-storageclient": "self.version", "symfony/property-access": "^5.4|^6.0", - "psr/clock": "^1" + "psr/clock": "^1", + "symfony/lock": "^6.0.0" }, "autoload": { "psr-4": { diff --git a/composer.json b/composer.json index e5bcc219ed7..d18f68a9c97 100644 --- a/composer.json +++ b/composer.json @@ -26,6 +26,7 @@ "neos/behat": "^9.0", "neos/contentrepositoryregistry-storageclient": "self.version", "symfony/property-access": "^5.4|^6.0", + "symfony/lock": "^6.0.0", "neos/fluid-adaptor": "*", "neos/cache": "*", "neos/eel": "*", From 20ccab113a3ca9435362c01284f90e19f4fa7fc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Wed, 20 Dec 2023 18:26:50 +0100 Subject: [PATCH 8/9] Split catchUp queue and run lock --- .../Classes/ContentRepository.php | 73 ++++++++------ .../Factory/ContentRepositoryFactory.php | 5 +- .../ProjectionCatchUpLockIdentifier.php | 28 ++++++ Neos.ContentRepository.Core/composer.json | 1 + .../Classes/ContentRepositoryRegistry.php | 3 +- .../Service/CatchUpDeduplicationQueue.php | 97 ++++++++++++------- .../Service/ProjectionReplayService.php | 7 +- .../Configuration/Objects.yaml | 6 -- .../Service/CatchUpDeduplicationQueueTest.php | 76 +++++++++++++++ .../Fixture/FakeFileCatchUpTrigger.php | 66 +++++++++++++ .../Service/Fixture/FakeProjectionState.php | 17 ++++ .../Fixture/FakeProjectionWithState.php | 46 +++++++++ 12 files changed, 347 insertions(+), 78 deletions(-) create mode 100644 Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php create mode 100644 Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php create mode 100644 Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php create mode 100644 Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php create mode 100644 Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index feecf6671cd..525604d4ddb 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -31,12 +31,12 @@ use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder; +use Neos\ContentRepository\Core\Projection\ProjectionCatchUpLockIdentifier; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; -use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\CatchUp\CatchUp; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\EventMetadata; @@ -45,6 +45,7 @@ use Neos\EventStore\Model\EventStream\VirtualStreamName; use Neos\EventStore\ProvidesSetupInterface; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; /** * Main Entry Point to the system. Encapsulates the full event-sourced Content Repository. @@ -64,7 +65,6 @@ final class ContentRepository */ private array $projectionStateCache; - /** * @internal use the {@see ContentRepositoryFactory::getOrBuild()} to instantiate */ @@ -80,9 +80,8 @@ public function __construct( private readonly ContentDimensionSourceInterface $contentDimensionSource, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, - private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, - ) { - } + private readonly LockFactory $lockFactory, + ) {} /** * The only API to send commands (mutation intentions) to the system. @@ -160,40 +159,50 @@ public function projectionState(string $projectionStateClassName): ProjectionSta */ public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void { - $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName); + $lock = $this->lockFactory->createLock( + ProjectionCatchUpLockIdentifier::createRunning($this->id, $projectionClassName)->value + ); + if (!$lock->acquire()) { + return; + } - $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection); - $catchUpHook = $catchUpHookFactory?->build($this); + try { + $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName); - // TODO allow custom stream name per projection - $streamName = VirtualStreamName::all(); - $eventStream = $this->eventStore->load($streamName); - if ($options->maximumSequenceNumber !== null) { - $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber); - } + $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection); + $catchUpHook = $catchUpHookFactory?->build($this); - $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) { - $event = $this->eventNormalizer->denormalize($eventEnvelope->event); - if ($options->progressCallback !== null) { - ($options->progressCallback)($event, $eventEnvelope); - } - if (!$projection->canHandle($event)) { - return; + // TODO allow custom stream name per projection + $streamName = VirtualStreamName::all(); + $eventStream = $this->eventStore->load($streamName); + if ($options->maximumSequenceNumber !== null) { + $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber); } - $catchUpHook?->onBeforeEvent($event, $eventEnvelope); - $projection->apply($event, $eventEnvelope); - $catchUpHook?->onAfterEvent($event, $eventEnvelope); - }; - $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage()); + $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) { + $event = $this->eventNormalizer->denormalize($eventEnvelope->event); + if ($options->progressCallback !== null) { + ($options->progressCallback)($event, $eventEnvelope); + } + if (!$projection->canHandle($event)) { + return; + } + $catchUpHook?->onBeforeEvent($event, $eventEnvelope); + $projection->apply($event, $eventEnvelope); + $catchUpHook?->onAfterEvent($event, $eventEnvelope); + }; - if ($catchUpHook !== null) { - $catchUpHook->onBeforeCatchUp(); - $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted()); + $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage()); + + if ($catchUpHook !== null) { + $catchUpHook->onBeforeCatchUp(); + $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted()); + } + $catchUp->run($eventStream); + $catchUpHook?->onAfterCatchUp(); + } finally { + $lock->release(); } - $catchUp->run($eventStream); - $catchUpHook?->onAfterCatchUp(); - $this->catchUpDeduplicationQueue->releaseCatchUpLock($projectionClassName); } public function setUp(): SetupResult diff --git a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php index 1061b36fe89..2d1f321aa96 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php @@ -33,6 +33,8 @@ use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\PersistingStoreInterface; use Symfony\Component\Serializer\Serializer; /** @@ -55,6 +57,7 @@ public function __construct( private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, + private readonly PersistingStoreInterface $lockStorage ) { $contentDimensionZookeeper = new ContentDimensionZookeeper($contentDimensionSource); $interDimensionalVariationGraph = new InterDimensionalVariationGraph( @@ -100,7 +103,7 @@ public function getOrBuild(): ContentRepository $this->projectionFactoryDependencies->contentDimensionSource, $this->userIdProvider, $this->clock, - $this->catchUpDeduplicationQueue + new LockFactory($this->lockStorage) ); } return $this->contentRepository; diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php new file mode 100644 index 00000000000..6cd32a5bd41 --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php @@ -0,0 +1,28 @@ +value, $projectionClassName)); + } + + public static function createRunning(ContentRepositoryId $contentRepositoryId, string $projectionClassName): self + { + return new self(self::generateIdentifier($contentRepositoryId, $projectionClassName) . 'RUN'); + } + + public static function createQueued(ContentRepositoryId $contentRepositoryId, string $projectionClassName): self + { + return new self(self::generateIdentifier($contentRepositoryId, $projectionClassName) . 'QUEUE'); + } +} diff --git a/Neos.ContentRepository.Core/composer.json b/Neos.ContentRepository.Core/composer.json index f1256f0a53b..d7af1439793 100644 --- a/Neos.ContentRepository.Core/composer.json +++ b/Neos.ContentRepository.Core/composer.json @@ -19,6 +19,7 @@ "neos/utility-arrays": "*", "doctrine/dbal": "^2.13", "symfony/serializer": "^6.3", + "symfony/lock": "^6.0.0", "psr/clock": "^1", "behat/transliterator": "~1.0", "ramsey/uuid": "^3.0 || ^4.0" diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index 745b5947e41..731a4cf226b 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -146,6 +146,7 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content $this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings), $this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings), $clock, + $this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage') ); } catch (\Exception $exception) { throw InvalidConfigurationException::fromException($contentRepositoryId, $exception); @@ -254,7 +255,7 @@ private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepo return new CatchUpDeduplicationQueue( $contentRepositoryId, - new LockFactory($catchUpStateLockStorage), + $catchUpStateLockStorage, $projectionCatchUpTrigger ); } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php index 7d97f538a68..31cc71214c3 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -2,22 +2,37 @@ namespace Neos\ContentRepositoryRegistry\Service; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; +use Neos\ContentRepository\Core\Projection\ProjectionCatchUpLockIdentifier; use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; +use Symfony\Component\Lock\Exception\LockAcquiringException; +use Symfony\Component\Lock\Exception\LockConflictedException; +use Symfony\Component\Lock\Exception\LockReleasingException; +use Symfony\Component\Lock\Key; +use Symfony\Component\Lock\Lock; use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\LockInterface; +use Symfony\Component\Lock\PersistingStoreInterface; /** - * This encapsulates logic to provide exactly once catchUps - * across multiple processes, leaving the way catchUps are done - * to the ProjectionCatchUpTriggerInterface. + * This encapsulates logic to prevent _some_ duplicate catch requests + * across multiple processes. It utilizes a shared lock to ignore more than + * a single catchUp request which will be executed as soon as possible, + * which means when there is no lock on the catchUp itself. The catchUp locks + * it's "running" state internally to prevent concurrency issues on the projections, + * in here we just check if that lock was already acquired (and thus a catchUp is running). */ -final readonly class CatchUpDeduplicationQueue +final class CatchUpDeduplicationQueue { + private LockFactory $lockFactory; + public function __construct( - private ContentRepositoryId $contentRepositoryId, - private LockFactory $lockFactoy, - private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger - ) {} + private readonly ContentRepositoryId $contentRepositoryId, + private readonly PersistingStoreInterface $lockStorage, + private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger + ) { + $this->lockFactory = new LockFactory($this->lockStorage); + } public function requestCatchUp(Projections $projections): void { @@ -40,31 +55,20 @@ public function requestCatchUp(Projections $projections): void } } - /** - * @param class-string $projectionClassName - * @return void - */ - public function releaseCatchUpLock(string $projectionClassName): void - { - $runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projectionClassName)); - $runningLock->isAcquired() && $runningLock->release(); - } - private function triggerCatchUpAndReturnQueued(Projections $projections): Projections { $projectionsToCatchUp = []; $queuedProjections = []; foreach ($projections as $projection) { - $runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projection::class)); - $queuedLock = $this->lockFactoy->createLock($this->cacheKeyQueued($projection::class)); - if ($runningLock->acquire()) { + $runLock = $this->lockFactory->createLock(ProjectionCatchUpLockIdentifier::createRunning($this->contentRepositoryId, $projection::class)->value); + if (!$runLock->isAcquired()) { // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. - $queuedLock->release(); + $this->tryReleaseQueuedLock($projection::class); $projectionsToCatchUp[] = $projection; continue; } - if ($queuedLock->acquire()) { + if ($this->tryAcquireQueuedLock($projection::class)) { $queuedProjections[] = $projection; } } @@ -80,17 +84,18 @@ private function retryQueued(Projections $queuedProjections): Projections $stillQueuedProjections = []; foreach ($queuedProjections as $projection) { - $runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projection::class)); - $queuedLock = $this->lockFactoy->createLock($this->cacheKeyQueued($projection::class)); + $queuedLock = $this->queuedLock($projection::class); if (!$queuedLock->isAcquired()) { // was dequeued, we can drop it continue; } - if ($runningLock->acquire()) { + $runLock = $this->lockFactory->createLock(ProjectionCatchUpLockIdentifier::createRunning($this->contentRepositoryId, $projection::class)->value); + + if ($runLock->isAcquired() === false) { // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. - $queuedLock->release(); + $this->tryReleaseQueuedLock($projection::class); $passToCatchUp[] = $projection; continue; } @@ -105,28 +110,48 @@ private function retryQueued(Projections $queuedProjections): Projections /** * @param class-string $projectionClassName - * @return string + * @return bool */ - private function cacheKeyPrefix(string $projectionClassName): string + private function tryAcquireQueuedLock(string $projectionClassName): bool { - return $this->contentRepositoryId->value . '_' . md5($projectionClassName); + $queuedLock = $this->queuedLock($projectionClassName); + try { + return $queuedLock->acquire(); + } catch (LockConflictedException|LockAcquiringException $_) { + return false; + } } /** * @param class-string $projectionClassName - * @return string + * @return void */ - private function cacheKeyRunning(string $projectionClassName): string + private function tryReleaseQueuedLock(string $projectionClassName): void { - return $this->cacheKeyPrefix($projectionClassName) . '_RUNNING'; + $queuedLock = $this->queuedLock($projectionClassName); + try { + $queuedLock->release(); + } catch (LockReleasingException $e) { + // lock might already be released, this is fine + } } /** * @param class-string $projectionClassName - * @return string + * @return LockInterface */ - private function cacheKeyQueued(string $projectionClassName): string + private function queuedLock(string $projectionClassName): LockInterface { - return $this->cacheKeyPrefix($projectionClassName) . '_QUEUED'; + $lockIdentifierQueued = ProjectionCatchUpLockIdentifier::createQueued($this->contentRepositoryId, $projectionClassName)->value; + $key = $this->createLockWithKeyState($lockIdentifierQueued, md5($lockIdentifierQueued)); + return new Lock($key, $this->lockStorage, 30.0); + } + + private function createLockWithKeyState(string $lockResource, string $keyState): Key + { + $key = new Key($lockResource); + $key->setState($this->lockStorage::class, $keyState); + + return $key; } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php b/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php index 9dec35da709..acbb23cfda4 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php @@ -21,6 +21,7 @@ final class ProjectionReplayService implements ContentRepositoryServiceInterface public function __construct( private readonly Projections $projections, private readonly ContentRepository $contentRepository, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue ) { } @@ -28,14 +29,16 @@ public function replayProjection(string $projectionAliasOrClassName, CatchUpOpti { $projectionClassName = $this->resolveProjectionClassName($projectionAliasOrClassName); $this->contentRepository->resetProjectionState($projectionClassName); - $this->contentRepository->catchUpProjection($projectionClassName, $options); + $this->catchUpDeduplicationQueue->requestCatchUp(Projections::fromArray([$projectionClassName])); } public function replayAllProjections(CatchUpOptions $options): void { + $projectionsArray = []; foreach ($this->projectionClassNamesAndAliases() as $classNamesAndAlias) { $this->contentRepository->resetProjectionState($classNamesAndAlias['className']); - $this->contentRepository->catchUpProjection($classNamesAndAlias['className'], $options); + $projectionsArray[] = $classNamesAndAlias['className']; + $this->catchUpDeduplicationQueue->requestCatchUp(Projections::fromArray($projectionsArray)); } } diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index 9184d28497e..8e74651259f 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -52,9 +52,3 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection arguments: 1: object: 'Neos.ContentRepositoryRegistry:DbalConnection' - -'Neos.ContentRepositoryRegistry:QueueLockFactory': - className: Symfony\Component\Lock\LockFactory - arguments: - 1: - object: 'Neos.ContentRepositoryRegistry:QueueLockStorage' diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php new file mode 100644 index 00000000000..35820bd6ef6 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php @@ -0,0 +1,76 @@ +requestCatchUp(Projections::fromArray([new FakeProjectionWithState($topic)])); + } + self::assertTrue(true); + } + + /** + * @group parallel_check + * @test + */ + public function validateEvents(): void + { + self::assertFileExists(FakeFileCatchUpTrigger::queueFilename()); + $lines = file(FakeFileCatchUpTrigger::queueFilename(), FILE_IGNORE_NEW_LINES) ?: []; + unlink(FakeFileCatchUpTrigger::queueFilename()); + $countersPerTopic = []; + self::assertGreaterThan(30, count($lines)); + foreach ($lines as $index => $line) { + [$topic, $runNumber, $counter] = explode(':', $line); + // we are ignoring the run number, it's more for diagnostic purposes, the order is not defined and can be assumed random. + $countersPerTopic[$topic] = $countersPerTopic[$topic] ?? 0; + self::assertSame($countersPerTopic[$topic], (int)$counter - 1, sprintf('Failed for topic "%s" in line %d:', $topic, $index + 1)); + $countersPerTopic[$topic] = (int)$counter; + } + + } + + private static function either(...$choices): string + { + return (string)$choices[array_rand($choices)]; + } +} + + diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php new file mode 100644 index 00000000000..523a7d4542b --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php @@ -0,0 +1,66 @@ +catchUp($projection); + } + } + + private function catchUp(ProjectionInterface $projection) + { + $lockFactory = new LockFactory(self::lockStore()); + $lock = $lockFactory->createLock(ProjectionCatchUpLockIdentifier::createRunning($this->contentRepositoryId, $projection::class)->value); + if (!$lock->acquire()) { + return; + } + try { + /** @phpstan-ignore-next-line */ + [$projectionTopic, $projectionRun] = explode(':', $projection->getState()->state); + $lines = file_exists(self::queueFilename()) ? file(self::queueFilename(), FILE_IGNORE_NEW_LINES) : []; + $newLines = []; + $counter = 0; + foreach ($lines as $line) { + $newLines[] = trim($line) . PHP_EOL; + [$topic, $run, $lineCounter] = explode(':', trim($line)); + if ($topic !== $projectionTopic) { + continue; + } + $counter = (int)$lineCounter; + } + $newLines[] = implode(':', [$projectionTopic, $projectionRun, $counter + 1]) . PHP_EOL; + file_put_contents(self::queueFilename(), $newLines); + } finally { + $lock->release(); + } + } + + public static function lockStore(): PersistingStoreInterface + { + return new PdoStore('sqlite:/tmp/neos_deduplication_test.db'); + } + + public static function queueFilename(): string + { + return '/tmp/neos_deduplication_test_projection.txt'; + } +} diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php new file mode 100644 index 00000000000..db32de6db49 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php @@ -0,0 +1,17 @@ +state; + } +} diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php new file mode 100644 index 00000000000..76a2fda1f14 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php @@ -0,0 +1,46 @@ +state); + } + + public function reset(): void + { + } + +} From e731e314d653736dac5119c1ca2a7acd395dbd67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Mu=CC=88ller?= Date: Thu, 21 Dec 2023 00:52:04 +0100 Subject: [PATCH 9/9] Fix style issues --- Neos.ContentRepository.Core/Classes/ContentRepository.php | 3 ++- .../Classes/Projection/ProjectionCatchUpLockIdentifier.php | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 525604d4ddb..ad4b73c0815 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -81,7 +81,8 @@ public function __construct( private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, private readonly LockFactory $lockFactory, - ) {} + ) { + } /** * The only API to send commands (mutation intentions) to the system. diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php index 6cd32a5bd41..e80ef22adc5 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php @@ -1,4 +1,5 @@