Skip to content

Commit

Permalink
TASK: Code cleanup and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kitsunet committed Nov 28, 2023
1 parent 5843e5e commit 1a7990e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;

Expand All @@ -23,13 +24,13 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
* @Flow\Inject
* @var ContentRepositoryRegistry
*/
protected $contentRepositoryRegistry;
protected ContentRepositoryRegistry $contentRepositoryRegistry;

/**
* @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates")
* @var VariableFrontend
*/
protected $catchUpStatesCache;
protected VariableFrontend $catchUpStatesCache;


/**
Expand All @@ -39,8 +40,10 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
*/
public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void
{
$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier));
$contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier);
$runnerState = AsynchronousCatchUpRunnerState::create($contentRepositoryId, $projectionClassName, $this->catchUpStatesCache);
$contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId);
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
$this->catchUpStatesCache->remove(md5($contentRepositoryIdentifier . $projectionClassName) . 'RUNNING');
$runnerState->setStopped();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState;
use Neos\Flow\Annotations as Flow;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
Expand Down Expand Up @@ -38,80 +39,71 @@ public function triggerCatchUp(Projections $projections): void
{
// modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103
// and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php
$queuedProjections = array_map($this->startCatchUpWithQueueing(...), iterator_to_array($projections));
$queuedProjections = array_filter($queuedProjections);

$attempts = 0;
while (!empty($queuedProjections)) {
usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms
if (++$attempts > 50) {
throw new \RuntimeException('TIMEOUT while waiting for projections to run queued catch up.', 1550232279);
$queuedProjections = [];
foreach ($projections as $projection) {
$runnerState = AsynchronousCatchUpRunnerState::create($this->contentRepositoryId, $projection::class, $this->catchUpStatesCache);
if (!$runnerState->isRunning()) {
$this->startCatchUp($projection, $runnerState);
continue;
}

foreach ($queuedProjections as $key => $projection) {
if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED') === false) {
// another process has started a catchUp while we waited, our queue has become irrelevant
unset($queuedProjections[$key]);
}
$hasStarted = $this->startCatchUp($projection);
if ($hasStarted) {
unset($queuedProjections[$key]);
$this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED');
}
if (!$runnerState->isQueued()) {
$runnerState->queue();
$queuedProjections[] = [$projection, $runnerState];
}
$queuedProjections = array_values($queuedProjections);
}

for ($attempts = 0; $attempts < 50 && !empty($queuedProjections); $attempts++) {
// Incremental back off with some randomness to get a wide spread between processes.
usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms
$queuedProjections = $this->recheckQueuedProjections($queuedProjections);
}
}

/**
* @param ProjectionInterface $projection
* @return bool has catchUp been started for given projection
* @throws \Neos\Cache\Exception
* @param array<array{ProjectionInterface<ProjectionStateInterface>, AsynchronousCatchUpRunnerState}> $queuedProjections
* @return array<array{ProjectionInterface<ProjectionStateInterface>, AsynchronousCatchUpRunnerState}>
*/
private function startCatchUp(ProjectionInterface $projection): bool
private function recheckQueuedProjections(array $queuedProjections): array
{
if ($this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'RUNNING')) {
return false;
$nextQueuedProjections = [];
/**
* @var ProjectionInterface<ProjectionStateInterface> $projection
* @var AsynchronousCatchUpRunnerState $runnerState
*/
foreach ($queuedProjections as [$projection, $runnerState]) {
// another process has started a catchUp and cleared the queue while we waited, our queue has become irrelevant
if ($runnerState->isQueued() === false) {
continue;
}

if ($runnerState->isRunning() === false) {
$this->startCatchUp($projection, $runnerState);
}

$nextQueuedProjections[] = [$projection, $runnerState];
}

$this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'RUNNING', 1);
// We are about to start a catchUp and can therefore discard any QUEUE that exists right now, apparently someone else is waiting for it.
$this->catchUpStatesCache->remove($this->cacheKeyPrefix($projection) . 'QUEUED');
return $nextQueuedProjections;
}

/**
* @param ProjectionInterface<ProjectionStateInterface> $projection
* @param AsynchronousCatchUpRunnerState $runnerState
* @return void
*/
private function startCatchUp(ProjectionInterface $projection, AsynchronousCatchUpRunnerState $runnerState): void
{
$runnerState->run();
// We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it.
$runnerState->dequeue();
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => get_class($projection)
'projectionClassName' => $projection::class
]
);

return true;
}

/**
* @param ProjectionInterface $projection
* @return ProjectionInterface|null Returns only projections that have been queued for later retry.
* @throws \Neos\Cache\Exception
*/
private function startCatchUpWithQueueing(ProjectionInterface $projection): ?ProjectionInterface
{
$catchUpStarted = $this->startCatchUp($projection);
if ($catchUpStarted) {
return null;
}

if (!$this->catchUpStatesCache->has($this->cacheKeyPrefix($projection) . 'QUEUED')) {
$this->catchUpStatesCache->set($this->cacheKeyPrefix($projection) . 'QUEUED', 1);
return $projection;
}

return null;
}

private function cacheKeyPrefix(ProjectionInterface $projection): string
{
$projectionClassName = get_class($projection);
return md5($this->contentRepositoryId->value . $projectionClassName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php
namespace Neos\ContentRepositoryRegistry\Service;

use Neos\Cache\Frontend\FrontendInterface;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;

/**
*
*/
final readonly class AsynchronousCatchUpRunnerState
{
private string $cacheKeyRunning;
private string $cacheKeyQueued;

private function __construct(
public ContentRepositoryId $contentRepositoryId,
public string $projectionClassName,
private FrontendInterface $cache
) {
$cacheKeyPrefix = $contentRepositoryId->value . '_' . md5($projectionClassName);
$this->cacheKeyRunning = $cacheKeyPrefix . '_RUNNING';
$this->cacheKeyQueued = $cacheKeyPrefix . '_QUEUED';
}

public static function create(ContentRepositoryId $contentRepositoryId, string $projectionClassName, FrontendInterface $cache): self
{
return new self($contentRepositoryId, $projectionClassName, $cache);
}

public function isRunning(): bool
{
return $this->cache->has($this->cacheKeyRunning);
}

public function run(): void
{
$this->cache->set($this->cacheKeyRunning, 1);
}

public function setStopped(): void
{
$this->cache->remove($this->cacheKeyRunning);
}

public function isQueued(): bool
{
return $this->cache->has($this->cacheKeyQueued);
}

public function queue(): void
{
$this->cache->set($this->cacheKeyQueued, 1);
}

public function dequeue(): void
{
$this->cache->remove($this->cacheKeyQueued);
}
}

0 comments on commit 1a7990e

Please sign in to comment.