diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index f0221f5f79d..ad4b73c0815 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -31,6 +31,7 @@ use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder; +use Neos\ContentRepository\Core\Projection\ProjectionCatchUpLockIdentifier; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; @@ -44,6 +45,7 @@ use Neos\EventStore\Model\EventStream\VirtualStreamName; use Neos\EventStore\ProvidesSetupInterface; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; /** * Main Entry Point to the system. Encapsulates the full event-sourced Content Repository. @@ -63,7 +65,6 @@ final class ContentRepository */ private array $projectionStateCache; - /** * @internal use the {@see ContentRepositoryFactory::getOrBuild()} to instantiate */ @@ -79,6 +80,7 @@ public function __construct( private readonly ContentDimensionSourceInterface $contentDimensionSource, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, + private readonly LockFactory $lockFactory, ) { } @@ -158,39 +160,50 @@ public function projectionState(string $projectionStateClassName): ProjectionSta */ public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void { - $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName); + $lock = $this->lockFactory->createLock( + ProjectionCatchUpLockIdentifier::createRunning($this->id, $projectionClassName)->value + ); + if (!$lock->acquire()) { + return; + } - $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection); - $catchUpHook = $catchUpHookFactory?->build($this); + try { + $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName); - // TODO allow custom stream name per projection - $streamName = VirtualStreamName::all(); - $eventStream = $this->eventStore->load($streamName); - if ($options->maximumSequenceNumber !== null) { - $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber); - } + $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection); + $catchUpHook = $catchUpHookFactory?->build($this); - $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) { - $event = $this->eventNormalizer->denormalize($eventEnvelope->event); - if ($options->progressCallback !== null) { - ($options->progressCallback)($event, $eventEnvelope); - } - if (!$projection->canHandle($event)) { - return; + // TODO allow custom stream name per projection + $streamName = VirtualStreamName::all(); + $eventStream = $this->eventStore->load($streamName); + if ($options->maximumSequenceNumber !== null) { + $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber); } - $catchUpHook?->onBeforeEvent($event, $eventEnvelope); - $projection->apply($event, $eventEnvelope); - $catchUpHook?->onAfterEvent($event, $eventEnvelope); - }; - $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage()); + $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) { + $event = $this->eventNormalizer->denormalize($eventEnvelope->event); + if ($options->progressCallback !== null) { + ($options->progressCallback)($event, $eventEnvelope); + } + if (!$projection->canHandle($event)) { + return; + } + $catchUpHook?->onBeforeEvent($event, $eventEnvelope); + $projection->apply($event, $eventEnvelope); + $catchUpHook?->onAfterEvent($event, $eventEnvelope); + }; - if ($catchUpHook !== null) { - $catchUpHook->onBeforeCatchUp(); - $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted()); + $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage()); + + if ($catchUpHook !== null) { + $catchUpHook->onBeforeCatchUp(); + $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted()); + } + $catchUp->run($eventStream); + $catchUpHook?->onAfterCatchUp(); + } finally { + $lock->release(); } - $catchUp->run($eventStream); - $catchUpHook?->onAfterCatchUp(); } public function setUp(): SetupResult diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index 42e17b5e591..1faa1ba4d47 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -6,16 +6,15 @@ use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\CommandHandler\PendingProjections; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\Projections; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; use Neos\EventStore\Model\Event; use Neos\EventStore\Model\Event\EventId; use Neos\EventStore\Model\Event\EventMetadata; use Neos\EventStore\Model\Events; -use Neos\EventStore\Model\EventStore\CommitResult; /** * Internal service to persist {@see EventInterface} with the proper normalization, and triggering the @@ -27,7 +26,7 @@ final class EventPersister { public function __construct( private readonly EventStoreInterface $eventStore, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly EventNormalizer $eventNormalizer, private readonly Projections $projections, ) { @@ -67,7 +66,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult $projection->markStale(); } } - $this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections); + $this->catchUpDeduplicationQueue->requestCatchUp($pendingProjections->projections); // The CommandResult can be used to block until projections are up to date. return new CommandResult($pendingProjections, $commitResult); diff --git a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php index ddb45de2859..2d1f321aa96 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php @@ -28,11 +28,13 @@ use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler; use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\PersistingStoreInterface; use Symfony\Component\Serializer\Serializer; /** @@ -52,9 +54,10 @@ public function __construct( ContentDimensionSourceInterface $contentDimensionSource, Serializer $propertySerializer, ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, + private readonly PersistingStoreInterface $lockStorage ) { $contentDimensionZookeeper = new ContentDimensionZookeeper($contentDimensionSource); $interDimensionalVariationGraph = new InterDimensionalVariationGraph( @@ -100,6 +103,7 @@ public function getOrBuild(): ContentRepository $this->projectionFactoryDependencies->contentDimensionSource, $this->userIdProvider, $this->clock, + new LockFactory($this->lockStorage) ); } return $this->contentRepository; @@ -164,7 +168,7 @@ private function buildEventPersister(): EventPersister if (!$this->eventPersister) { $this->eventPersister = new EventPersister( $this->projectionFactoryDependencies->eventStore, - $this->projectionCatchUpTrigger, + $this->catchUpDeduplicationQueue, $this->projectionFactoryDependencies->eventNormalizer, $this->projectionsAndCatchUpHooks->projections, ); diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php new file mode 100644 index 00000000000..e80ef22adc5 --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php @@ -0,0 +1,30 @@ +value, $projectionClassName)); + } + + public static function createRunning(ContentRepositoryId $contentRepositoryId, string $projectionClassName): self + { + return new self(self::generateIdentifier($contentRepositoryId, $projectionClassName) . 'RUN'); + } + + public static function createQueued(ContentRepositoryId $contentRepositoryId, string $projectionClassName): self + { + return new self(self::generateIdentifier($contentRepositoryId, $projectionClassName) . 'QUEUE'); + } +} diff --git a/Neos.ContentRepository.Core/Classes/Projection/Projections.php b/Neos.ContentRepository.Core/Classes/Projection/Projections.php index e9910356908..a507f0284c9 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/Projections.php +++ b/Neos.ContentRepository.Core/Classes/Projection/Projections.php @@ -93,4 +93,9 @@ public function getIterator(): \Traversable { yield from $this->projections; } + + public function isEmpty(): bool + { + return $this->projections === []; + } } diff --git a/Neos.ContentRepository.Core/composer.json b/Neos.ContentRepository.Core/composer.json index f1256f0a53b..d7af1439793 100644 --- a/Neos.ContentRepository.Core/composer.json +++ b/Neos.ContentRepository.Core/composer.json @@ -19,6 +19,7 @@ "neos/utility-arrays": "*", "doctrine/dbal": "^2.13", "symfony/serializer": "^6.3", + "symfony/lock": "^6.0.0", "psr/clock": "^1", "behat/transliterator": "~1.0", "ramsey/uuid": "^3.0 || ^4.0" diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php index d30d9eca6a6..064373a5444 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php @@ -9,7 +9,6 @@ use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\ContentRepository\Core\Factory\ContentRepositoryId; -use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; /** @@ -18,11 +17,10 @@ */ class SubprocessProjectionCatchUpCommandController extends CommandController { - /** - * @Flow\Inject - * @var ContentRepositoryRegistry - */ - protected $contentRepositoryRegistry; + public function __construct(private readonly ContentRepositoryRegistry $contentRepositoryRegistry) + { + parent::__construct(); + } /** * @param string $contentRepositoryIdentifier diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index ef9882db035..731a4cf226b 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -3,6 +3,7 @@ namespace Neos\ContentRepositoryRegistry; +use Neos\Cache\Frontend\FrontendInterface; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface; use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory; @@ -14,7 +15,6 @@ use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\Node; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException; @@ -25,12 +25,16 @@ use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface; +use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue; use Neos\EventStore\EventStoreInterface; use Neos\Flow\Annotations as Flow; use Neos\Flow\ObjectManagement\ObjectManagerInterface; use Neos\Utility\Arrays; use Neos\Utility\PositionalArraySorter; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\PersistingStoreInterface; +use Symfony\Component\Lock\Store\DoctrineDbalStore; use Symfony\Component\Serializer\Normalizer\DenormalizerInterface; use Symfony\Component\Serializer\Normalizer\NormalizerInterface; use Symfony\Component\Serializer\Serializer; @@ -139,9 +143,10 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content $this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings), $this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings), $this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings), - $this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings), + $this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings), $this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings), $clock, + $this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage') ); } catch (\Exception $exception) { throw InvalidConfigurationException::fromException($contentRepositoryId, $exception); @@ -228,14 +233,31 @@ private function buildProjectionsFactory(ContentRepositoryId $contentRepositoryI } /** @param array $contentRepositorySettings */ - private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface + private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): CatchUpDeduplicationQueue { isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value); $projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']); if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) { throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory)); } - return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); + $projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); + + $catchUpStateLockStorage = $this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage'); + if (!$catchUpStateLockStorage instanceof PersistingStoreInterface) { + throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:QueueLockStorage" must provide a \Symfony\Component\Lock\PersistingStoreInterface, but is "%s".', get_debug_type($catchUpStateLockStorage)); + } + if ($catchUpStateLockStorage instanceof DoctrineDbalStore) { + try { + // hack to ensure tables exist for Dbal + $catchUpStateLockStorage->createTable(); + } catch (\Doctrine\DBAL\Exception\TableExistsException $_) {} + } + + return new CatchUpDeduplicationQueue( + $contentRepositoryId, + $catchUpStateLockStorage, + $projectionCatchUpTrigger + ); } /** @param array $contentRepositorySettings */ diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php new file mode 100644 index 00000000000..31cc71214c3 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Classes/Service/CatchUpDeduplicationQueue.php @@ -0,0 +1,157 @@ +lockFactory = new LockFactory($this->lockStorage); + } + + public function requestCatchUp(Projections $projections): void + { + $queuedProjections = $this->triggerCatchUpAndReturnQueued($projections); + + /* + * Due to the random nature of the retry delay (see below) + * we define an absolute time limit spend in this loop as + * end condition in case we cannot resolve the queue before. + * */ + $startTime = $currentTime = microtime(true); + $attempts = 0; + /** @phpstan-ignore-next-line */ + while ($queuedProjections->isEmpty() === false && ($currentTime - $startTime) < 20) { + // incrementally slower retries with some randomness to allow for tie breaks if parallel processes are in this loop. + usleep(random_int(100, 100 + $attempts) + ($attempts * $attempts * 10)); + $queuedProjections = $this->retryQueued($queuedProjections); + $attempts++; + $currentTime = microtime(true); + } + } + + private function triggerCatchUpAndReturnQueued(Projections $projections): Projections + { + $projectionsToCatchUp = []; + $queuedProjections = []; + foreach ($projections as $projection) { + $runLock = $this->lockFactory->createLock(ProjectionCatchUpLockIdentifier::createRunning($this->contentRepositoryId, $projection::class)->value); + if (!$runLock->isAcquired()) { + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $this->tryReleaseQueuedLock($projection::class); + $projectionsToCatchUp[] = $projection; + continue; + } + + if ($this->tryAcquireQueuedLock($projection::class)) { + $queuedProjections[] = $projection; + } + } + + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($projectionsToCatchUp)); + + return Projections::fromArray($queuedProjections); + } + + private function retryQueued(Projections $queuedProjections): Projections + { + $passToCatchUp = []; + $stillQueuedProjections = []; + + foreach ($queuedProjections as $projection) { + $queuedLock = $this->queuedLock($projection::class); + + if (!$queuedLock->isAcquired()) { + // was dequeued, we can drop it + continue; + } + + $runLock = $this->lockFactory->createLock(ProjectionCatchUpLockIdentifier::createRunning($this->contentRepositoryId, $projection::class)->value); + + if ($runLock->isAcquired() === false) { + // We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it. + $this->tryReleaseQueuedLock($projection::class); + $passToCatchUp[] = $projection; + continue; + } + + $stillQueuedProjections[] = $projection; + } + + $this->projectionCatchUpTrigger->triggerCatchUp(Projections::fromArray($passToCatchUp)); + + return Projections::fromArray($stillQueuedProjections); + } + + /** + * @param class-string $projectionClassName + * @return bool + */ + private function tryAcquireQueuedLock(string $projectionClassName): bool + { + $queuedLock = $this->queuedLock($projectionClassName); + try { + return $queuedLock->acquire(); + } catch (LockConflictedException|LockAcquiringException $_) { + return false; + } + } + + /** + * @param class-string $projectionClassName + * @return void + */ + private function tryReleaseQueuedLock(string $projectionClassName): void + { + $queuedLock = $this->queuedLock($projectionClassName); + try { + $queuedLock->release(); + } catch (LockReleasingException $e) { + // lock might already be released, this is fine + } + } + + /** + * @param class-string $projectionClassName + * @return LockInterface + */ + private function queuedLock(string $projectionClassName): LockInterface + { + $lockIdentifierQueued = ProjectionCatchUpLockIdentifier::createQueued($this->contentRepositoryId, $projectionClassName)->value; + $key = $this->createLockWithKeyState($lockIdentifierQueued, md5($lockIdentifierQueued)); + return new Lock($key, $this->lockStorage, 30.0); + } + + private function createLockWithKeyState(string $lockResource, string $keyState): Key + { + $key = new Key($lockResource); + $key->setState($this->lockStorage::class, $keyState); + + return $key; + } +} diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php b/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php index 9dec35da709..acbb23cfda4 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/ProjectionReplayService.php @@ -21,6 +21,7 @@ final class ProjectionReplayService implements ContentRepositoryServiceInterface public function __construct( private readonly Projections $projections, private readonly ContentRepository $contentRepository, + private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue ) { } @@ -28,14 +29,16 @@ public function replayProjection(string $projectionAliasOrClassName, CatchUpOpti { $projectionClassName = $this->resolveProjectionClassName($projectionAliasOrClassName); $this->contentRepository->resetProjectionState($projectionClassName); - $this->contentRepository->catchUpProjection($projectionClassName, $options); + $this->catchUpDeduplicationQueue->requestCatchUp(Projections::fromArray([$projectionClassName])); } public function replayAllProjections(CatchUpOptions $options): void { + $projectionsArray = []; foreach ($this->projectionClassNamesAndAliases() as $classNamesAndAlias) { $this->contentRepository->resetProjectionState($classNamesAndAlias['className']); - $this->contentRepository->catchUpProjection($classNamesAndAlias['className'], $options); + $projectionsArray[] = $classNamesAndAlias['className']; + $this->catchUpDeduplicationQueue->requestCatchUp(Projections::fromArray($projectionsArray)); } } diff --git a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml index 19b4e1c3b8f..168bfdd1775 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml @@ -3,3 +3,9 @@ Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents: backend: Neos\Cache\Backend\FileBackend backendOptions: defaultLifetime: 400 + +Neos_ContentRepositoryRegistry_CatchUpStates: + frontend: Neos\Cache\Frontend\VariableFrontend + backend: Neos\Cache\Backend\FileBackend + backendOptions: + defaultLifetime: 20 diff --git a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml index 564750f56d8..8e74651259f 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Objects.yaml @@ -33,3 +33,22 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection value: Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjectionFactory 2: object: 'Neos\ContentRepository\Core\Infrastructure\DbalClientInterface' + +'Neos.ContentRepositoryRegistry:CacheCatchUpStates': + className: Neos\Cache\Frontend\VariableFrontend + factoryObjectName: Neos\Flow\Cache\CacheManager + factoryMethodName: getCache + arguments: + 1: + value: Neos_ContentRepositoryRegistry_CatchUpStates + +'Neos.ContentRepositoryRegistry:DbalConnection': + className: Doctrine\DBAL\Connection + factoryObjectName: Neos\ContentRepositoryRegistry\DoctrineDbalClient\DoctrineDbalClient + factoryMethodName: getConnection + +'Neos.ContentRepositoryRegistry:QueueLockStorage': + className: Symfony\Component\Lock\Store\DoctrineDbalStore + arguments: + 1: + object: 'Neos.ContentRepositoryRegistry:DbalConnection' diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php new file mode 100644 index 00000000000..35820bd6ef6 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/CatchUpDeduplicationQueueTest.php @@ -0,0 +1,76 @@ +requestCatchUp(Projections::fromArray([new FakeProjectionWithState($topic)])); + } + self::assertTrue(true); + } + + /** + * @group parallel_check + * @test + */ + public function validateEvents(): void + { + self::assertFileExists(FakeFileCatchUpTrigger::queueFilename()); + $lines = file(FakeFileCatchUpTrigger::queueFilename(), FILE_IGNORE_NEW_LINES) ?: []; + unlink(FakeFileCatchUpTrigger::queueFilename()); + $countersPerTopic = []; + self::assertGreaterThan(30, count($lines)); + foreach ($lines as $index => $line) { + [$topic, $runNumber, $counter] = explode(':', $line); + // we are ignoring the run number, it's more for diagnostic purposes, the order is not defined and can be assumed random. + $countersPerTopic[$topic] = $countersPerTopic[$topic] ?? 0; + self::assertSame($countersPerTopic[$topic], (int)$counter - 1, sprintf('Failed for topic "%s" in line %d:', $topic, $index + 1)); + $countersPerTopic[$topic] = (int)$counter; + } + + } + + private static function either(...$choices): string + { + return (string)$choices[array_rand($choices)]; + } +} + + diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php new file mode 100644 index 00000000000..523a7d4542b --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeFileCatchUpTrigger.php @@ -0,0 +1,66 @@ +catchUp($projection); + } + } + + private function catchUp(ProjectionInterface $projection) + { + $lockFactory = new LockFactory(self::lockStore()); + $lock = $lockFactory->createLock(ProjectionCatchUpLockIdentifier::createRunning($this->contentRepositoryId, $projection::class)->value); + if (!$lock->acquire()) { + return; + } + try { + /** @phpstan-ignore-next-line */ + [$projectionTopic, $projectionRun] = explode(':', $projection->getState()->state); + $lines = file_exists(self::queueFilename()) ? file(self::queueFilename(), FILE_IGNORE_NEW_LINES) : []; + $newLines = []; + $counter = 0; + foreach ($lines as $line) { + $newLines[] = trim($line) . PHP_EOL; + [$topic, $run, $lineCounter] = explode(':', trim($line)); + if ($topic !== $projectionTopic) { + continue; + } + $counter = (int)$lineCounter; + } + $newLines[] = implode(':', [$projectionTopic, $projectionRun, $counter + 1]) . PHP_EOL; + file_put_contents(self::queueFilename(), $newLines); + } finally { + $lock->release(); + } + } + + public static function lockStore(): PersistingStoreInterface + { + return new PdoStore('sqlite:/tmp/neos_deduplication_test.db'); + } + + public static function queueFilename(): string + { + return '/tmp/neos_deduplication_test_projection.txt'; + } +} diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php new file mode 100644 index 00000000000..db32de6db49 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionState.php @@ -0,0 +1,17 @@ +state; + } +} diff --git a/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php new file mode 100644 index 00000000000..76a2fda1f14 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Unit/Service/Fixture/FakeProjectionWithState.php @@ -0,0 +1,46 @@ +state); + } + + public function reset(): void + { + } + +} diff --git a/Neos.ContentRepositoryRegistry/composer.json b/Neos.ContentRepositoryRegistry/composer.json index 9a095a3f034..5bc23165b84 100644 --- a/Neos.ContentRepositoryRegistry/composer.json +++ b/Neos.ContentRepositoryRegistry/composer.json @@ -17,7 +17,8 @@ "neos/contentrepository-core": "self.version", "neos/contentrepositoryregistry-storageclient": "self.version", "symfony/property-access": "^5.4|^6.0", - "psr/clock": "^1" + "psr/clock": "^1", + "symfony/lock": "^6.0.0" }, "autoload": { "psr-4": { diff --git a/composer.json b/composer.json index e5bcc219ed7..d18f68a9c97 100644 --- a/composer.json +++ b/composer.json @@ -26,6 +26,7 @@ "neos/behat": "^9.0", "neos/contentrepositoryregistry-storageclient": "self.version", "symfony/property-access": "^5.4|^6.0", + "symfony/lock": "^6.0.0", "neos/fluid-adaptor": "*", "neos/cache": "*", "neos/eel": "*",