Skip to content

Commit

Permalink
TASK: Migrate checkpoints to subscriptions via `migrateevents:migrate…
Browse files Browse the repository at this point in the history
…CheckpointsToSubscriptions`
  • Loading branch information
mhsdesign committed Dec 2, 2024
1 parent ea3eaa6 commit e750d93
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public function isEmpty(): bool
return $this->subscriptionsById === [];
}

public function first(): ?Subscription
{
foreach ($this->subscriptionsById as $subscription) {
return $subscription;
}
return null;
}

public function count(): int
{
return count($this->subscriptionsById);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,20 @@ public function copyNodesStatusCommand(string $contentRepository = 'default'): v
$eventMigrationService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, $this->eventMigrationServiceFactory);
$eventMigrationService->copyNodesStatus($this->outputLine(...));
}

/**
* Migrates the checkpoint tables to ACTIVE subscriptions
*
* Needed for #5321: https://github.com/neos/neos-development-collection/pull/5321
*
* Included in November 2024 - before final Neos 9.0 release
*
* @param string $contentRepository Identifier of the Content Repository to migrate
*/
public function migrateCheckpointsToSubscriptionsCommand(string $contentRepository = 'default'): void
{
$contentRepositoryId = ContentRepositoryId::fromString($contentRepository);
$eventMigrationService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, $this->eventMigrationServiceFactory);
$eventMigrationService->migrateCheckpointsToSubscriptions($this->outputLine(...));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateClassification;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface;
use Neos\ContentRepository\Core\Subscription\Subscription;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\ContentRepositoryRegistry\Command\MigrateEventsCommandController;
use Neos\ContentRepositoryRegistry\Factory\EventStore\DoctrineEventStoreFactory;
use Neos\EventStore\EventStoreInterface;
Expand All @@ -44,6 +50,7 @@
use Neos\Neos\Domain\Model\WorkspaceClassification;
use Neos\Neos\Domain\Model\WorkspaceRole;
use Neos\Neos\Domain\Model\WorkspaceRoleSubjectType;
use Doctrine\DBAL\Exception as DBALException;

/**
* Content Repository service to perform migrations of events.
Expand All @@ -59,6 +66,7 @@ final class EventMigrationService implements ContentRepositoryServiceInterface
private array $eventsModified = [];

public function __construct(
private readonly SubscriptionEngine $subscriptionEngine,
private readonly ContentRepositoryId $contentRepositoryId,
private readonly EventStoreInterface $eventStore,
private readonly Connection $connection
Expand Down Expand Up @@ -903,6 +911,59 @@ public function copyNodesStatus(\Closure $outputFn): void
$outputFn('<comment>NOTE: To reduce the number of matched content streams and to cleanup the event store run `./flow contentStream:removeDangling` and `./flow contentStream:pruneRemovedFromEventStream`</comment>');
}

public function migrateCheckpointsToSubscriptions(\Closure $outputFn): void
{
/** @var SubscriptionStoreInterface $subscriptionStore */
$subscriptionStore = (new \ReflectionClass($this->subscriptionEngine))->getProperty('subscriptionStore')->getValue($this->subscriptionEngine);
$subscriptionStore->setup();

foreach ([
'contentGraph' => 'cr_%s_p_graph_checkpoint',
'Neos.Neos:DocumentUriPathProjection' => 'cr_%s_p_neos_documenturipath_checkpoint',
'Neos.Neos:PendingChangesProjection' => 'cr_%s_p_neos_change_checkpoint',
] as $subscriberId => $projectionCheckpointTablePattern) {
$projectionCheckpointTable = sprintf($projectionCheckpointTablePattern, $this->contentRepositoryId->value);
try {
$rows = $this->connection->fetchAllAssociative("SELECT appliedsequencenumber from {$projectionCheckpointTable}");
} catch (DBALException $e) {
$outputFn(sprintf('<comment>Could not migrate subscriber %s</comment>, please replay: %s', $subscriberId, $e->getMessage()));
continue;
}
$first = reset($rows);
if (count($rows) !== 1 || !isset($first['appliedsequencenumber'])) {
$outputFn(sprintf('<comment>Could not migrate subscriber %s</comment>, please replay. Expected exactly one appliedsequencenumber', $subscriberId));
continue;
}

$subscription = $subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::create([SubscriptionId::fromString($subscriberId)]))->first();

if ($subscription === null) {
$subscriptionStore->add(
new Subscription(
SubscriptionId::fromString($subscriberId),
SubscriptionStatus::ACTIVE,
SequenceNumber::fromInteger((int)$first['appliedsequencenumber']),
null,
null
)
);
$outputFn(sprintf('<success>Added subscriber %s with active and position %s</success>', $subscriberId, $first['appliedsequencenumber']));
} else {
if ($subscription->status === SubscriptionStatus::ACTIVE) {
$outputFn(sprintf('<success>Subscriber %s is already active</success>', $subscriberId));
continue;
}
$subscriptionStore->update(
SubscriptionId::fromString($subscriberId),
SubscriptionStatus::ACTIVE,
SequenceNumber::fromInteger((int)$first['appliedsequencenumber']),
null
);
$outputFn(sprintf('<success>Updated subscriber %s to active and position %s</success>', $subscriberId, $first['appliedsequencenumber']));
}
}
}

/** ------------------------ */

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
}

return new EventMigrationService(
$serviceFactoryDependencies->subscriptionEngine,
$serviceFactoryDependencies->contentRepositoryId,
$serviceFactoryDependencies->eventStore,
$this->connection
Expand Down

0 comments on commit e750d93

Please sign in to comment.