Skip to content

Commit

Permalink
Merge pull request #5010 from neos/feature/improved-cli-output-for-pr…
Browse files Browse the repository at this point in the history
…ojection-replay

FEATURE: Improved CLI output for projection replay
  • Loading branch information
bwaidelich authored Apr 25, 2024
2 parents 7a74b72 + b1fcbcc commit 84d2dfe
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* @implements \IteratorAggregate<ProjectionInterface>
* @internal
*/
final class Projections implements \IteratorAggregate
final class Projections implements \IteratorAggregate, \Countable
{
/**
* @var array<class-string<ProjectionInterface<ProjectionStateInterface>>, ProjectionInterface<ProjectionStateInterface>>
Expand Down Expand Up @@ -93,4 +93,9 @@ public function getIterator(): \Traversable
{
yield from $this->projections;
}

public function count(): int
{
return count($this->projections);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventStore\StatusType;
use Neos\Flow\Cli\CommandController;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\Output;

final class CrCommandController extends CommandController
Expand Down Expand Up @@ -116,6 +118,11 @@ public function statusCommand(string $contentRepository = 'default', bool $verbo
*/
public function projectionReplayCommand(string $projection, string $contentRepository = 'default', bool $force = false, bool $quiet = false, int $until = 0): void
{
if ($quiet) {
$this->output->getOutput()->setVerbosity(Output::VERBOSITY_QUIET);
}
$progressBar = new ProgressBar($this->output->getOutput());
$progressBar->setFormat(' %current%/%max% [%bar%] %percent:3s%% %elapsed:16s%/%estimated:-16s% %memory:6s%');
if (!$force && $quiet) {
$this->outputLine('Cannot run in quiet mode without --force. Please acknowledge that this command will reset and replay this projection. This may take some time.');
$this->quit(1);
Expand All @@ -129,19 +136,18 @@ public function projectionReplayCommand(string $projection, string $contentRepos
$contentRepositoryId = ContentRepositoryId::fromString($contentRepository);
$projectionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, $this->projectionServiceFactory);

$options = CatchUpOptions::create();
if (!$quiet) {
$this->outputLine('Replaying events for projection "%s" of Content Repository "%s" ...', [$projection, $contentRepositoryId->value]);
$this->output->progressStart();
$progressBar->start(max($until > 0 ? $until : $projectionService->highestSequenceNumber()->value, 1));
$options->with(progressCallback: fn () => $progressBar->advance());
}
$options = CatchUpOptions::create(
progressCallback: fn () => $this->output->progressAdvance(),
);
if ($until > 0) {
$options = $options->with(maximumSequenceNumber: SequenceNumber::fromInteger($until));
}
$projectionService->replayProjection($projection, $options);
if (!$quiet) {
$this->output->progressFinish();
$progressBar->finish();
$this->outputLine();
$this->outputLine('<success>Done.</success>');
}
Expand All @@ -153,9 +159,23 @@ public function projectionReplayCommand(string $projection, string $contentRepos
* @param string $contentRepository Identifier of the Content Repository instance to operate on
* @param bool $force Replay the projection without confirmation. This may take some time!
* @param bool $quiet If set only fatal errors are rendered to the output (must be used with --force flag to avoid user input)
* @param int $until Until which sequence number should projections be replayed? useful for debugging
*/
public function projectionReplayAllCommand(string $contentRepository = 'default', bool $force = false, bool $quiet = false): void
public function projectionReplayAllCommand(string $contentRepository = 'default', bool $force = false, bool $quiet = false, int $until = 0): void
{
if ($quiet) {
$this->output->getOutput()->setVerbosity(Output::VERBOSITY_QUIET);
}
$mainSection = ($this->output->getOutput() instanceof ConsoleOutput) ? $this->output->getOutput()->section() : $this->output->getOutput();
$mainProgressBar = new ProgressBar($mainSection);
$mainProgressBar->setBarCharacter('<success>█</success>');
$mainProgressBar->setEmptyBarCharacter('');
$mainProgressBar->setProgressCharacter('<success>█</success>');
$mainProgressBar->setFormat('debug');

$subSection = ($this->output->getOutput() instanceof ConsoleOutput) ? $this->output->getOutput()->section() : $this->output->getOutput();
$progressBar = new ProgressBar($subSection);
$progressBar->setFormat(' %message% - %current%/%max% [%bar%] %percent:3s%% %elapsed:16s%/%estimated:-16s% %memory:6s%');
if (!$force && $quiet) {
$this->outputLine('Cannot run in quiet mode without --force. Please acknowledge that this command will reset and replay this projection. This may take some time.');
$this->quit(1);
Expand All @@ -171,9 +191,28 @@ public function projectionReplayAllCommand(string $contentRepository = 'default'
if (!$quiet) {
$this->outputLine('Replaying events for all projections of Content Repository "%s" ...', [$contentRepositoryId->value]);
}
// TODO progress bar with all events? Like projectionReplayCommand?
$projectionService->replayAllProjections(CatchUpOptions::create(), fn (string $projectionAlias) => $this->outputLine(sprintf(' * replaying %s projection', $projectionAlias)));
$options = CatchUpOptions::create();
if (!$quiet) {
$options = $options->with(progressCallback: fn () => $progressBar->advance());
}
if ($until > 0) {
$options = $options->with(maximumSequenceNumber: SequenceNumber::fromInteger($until));
}
$highestSequenceNumber = max($until > 0 ? $until : $projectionService->highestSequenceNumber()->value, 1);
$mainProgressBar->start($projectionService->numberOfProjections());
$mainProgressCallback = null;
if (!$quiet) {
$mainProgressCallback = static function (string $projectionAlias) use ($mainProgressBar, $progressBar, $highestSequenceNumber) {
$mainProgressBar->advance();
$progressBar->setMessage($projectionAlias);
$progressBar->start($highestSequenceNumber);
$progressBar->setProgress(0);
};
}
$projectionService->replayAllProjections($options, $mainProgressCallback);
if (!$quiet) {
$mainProgressBar->finish();
$progressBar->finish();
$this->outputLine('<success>Done.</success>');
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventStream\VirtualStreamName;

/**
* Content Repository service to perform Projection replays
Expand All @@ -21,6 +24,7 @@ final class ProjectionReplayService implements ContentRepositoryServiceInterface
public function __construct(
private readonly Projections $projections,
private readonly ContentRepository $contentRepository,
private readonly EventStoreInterface $eventStore,
) {
}

Expand Down Expand Up @@ -49,6 +53,19 @@ public function resetAllProjections(): void
}
}

public function highestSequenceNumber(): SequenceNumber
{
foreach ($this->eventStore->load(VirtualStreamName::all())->backwards()->limit(1) as $eventEnvelope) {
return $eventEnvelope->sequenceNumber;
}
return SequenceNumber::none();
}

public function numberOfProjections(): int
{
return count($this->projections);
}

/**
* @return class-string<ProjectionInterface<ProjectionStateInterface>>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
return new ProjectionReplayService(
$serviceFactoryDependencies->projections,
$serviceFactoryDependencies->contentRepository,
$serviceFactoryDependencies->eventStore,
);
}
}

0 comments on commit 84d2dfe

Please sign in to comment.