Skip to content

Commit

Permalink
TASK: Remove subscription:reactivate for now
Browse files Browse the repository at this point in the history
The concept of reactivation in case of errors was rather experimental. For core projections we would rather recommend a full replay, and its unlikely that other third party projections will profit from this now.
Instead, if deemed necessary the reactivation api can be reintroduced later with little effort.

Also, the case for detached projections is considered an edge-case and a simple replay should suffice.

The change was done to minimalize confusion and simplify the api.
  • Loading branch information
mhsdesign committed Dec 16, 2024
1 parent 6b90c92 commit 525fcc4
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,68 +24,6 @@

final class ProjectionErrorTest extends AbstractSubscriptionEngineTestCase
{
/** @test */
public function projectionWithErrorCanBeReactivated()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->subscriptionEngine->setup();
$this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok());
$result = $this->subscriptionEngine->boot();
self::assertEquals(ProcessedResult::success(0), $result);
$this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::none());
$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none());

// commit an event
$this->commitExampleContentStreamEvent();

// catchup active tries to apply the commited event
$exception = new \RuntimeException('This projection is kaputt.');
$this->fakeProjection->expects($i = self::exactly(2))->method('apply')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($i, $exception) {
match($i->getInvocationCount()) {
1 => [
self::assertEquals(1, $eventEnvelope->sequenceNumber->value),
throw $exception
],
2 => [
// on second call is repaired:
self::assertEquals(1, $eventEnvelope->sequenceNumber->value),
]
};
});
$expectedStatusForFailedProjection = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
setupStatus: ProjectionStatus::ok(),
);

$result = $this->subscriptionEngine->catchUpActive();
self::assertEquals(ProcessedResult::failed(1, Errors::fromArray([Error::create(
SubscriptionId::fromString('Vendor.Package:FakeProjection'),
$exception->getMessage(),
$exception,
SequenceNumber::fromInteger(1)
)])), $result);

self::assertEquals(
$expectedStatusForFailedProjection,
$this->subscriptionStatus('Vendor.Package:FakeProjection')
);
$this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));

//
// fix projection and catchup
//

// reactivate and catchup
$result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:FakeProjection')]));
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));
}

/** @test */
public function fixFailedProjectionViaReset()
{
Expand Down Expand Up @@ -189,23 +127,6 @@ public function irreparableProjection()
self::assertEquals(Result::success(), $result);
self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection'));

// reactivation will attempt to retry fix this, but can only work if the projection is repaired and will lead to an error otherwise:
$result = $this->subscriptionEngine->reactivate();
self::assertEquals(1, $result->numberOfProcessedEvents);
self::assertEquals('Must not happen! Debug projection detected duplicate event 1 of type ContentStreamWasCreated', $result->errors->first()?->message);

self::assertEquals(
ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
// previous state is now an error too also error:
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ERROR, $result->errors->first()->throwable),
setupStatus: ProjectionStatus::ok(),
),
$this->subscriptionStatus('Vendor.Package:SecondFakeProjection')
);

// expect the subscriptionError to be reset to null
$result = $this->subscriptionEngine->reset();
self::assertNull($result->errors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public function projectionIsDetachedOnCatchupActive()
}

/** @test */
public function projectionIsDetachedOnSetupAndReattachedIfPossible()
public function projectionIsDetachedOnSetup()
{
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
Expand Down Expand Up @@ -159,12 +159,6 @@ public function projectionIsDetachedOnSetupAndReattachedIfPossible()
),
$this->subscriptionStatus('Vendor.Package:FakeProjection')
);

// reactivate does re-attach as the projection if its found again
$result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:FakeProjection')]));
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));
}

/** @test */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,19 @@
*
* Special cases:
*
* *Replay*
* *Replay for initialisation*
*
* For initialising on a new database - which contains events already - a replay will make sure that the subscriptions
* are emptied and reapply the events. This can be triggered via {@see replaySubscription} or {@see replayAllSubscriptions}
*
* And after registering a new subscription a setup as well as a replay of this subscription is also required.
*
* *Reactivate*
* *Replay to repair*
*
* In case a subscription is detached but is reinstalled a reactivation is needed via {@see reactivateSubscription}
* In case a subscription is detached and then reinstalled a replay will make sure its caught up to all new events.
* And that the previous state will be reset as the projections logic might have changed.
*
* Also in case a subscription runs into the error status, its code needs to be fixed, and it can also be attempted to be reactivated.
*
* Note that in both cases a subscription replay would also work, but with the difference that the subscription is reset as well.
* Also in case a subscription runs into the error status, its code needs to be fixed, and it can be attempted to be replayed.
*
* @api
*/
Expand Down Expand Up @@ -149,29 +148,6 @@ public function replayAllSubscriptions(\Closure|null $progressCallback = null):
return null;
}

/**
* Reactivate a subscription
*
* The explicit catchup is only needed for subscriptions in the error or detached status with an advanced position.
* Running a full replay would work but might be overkill, instead this reactivation will just attempt
* catchup the subscription back to active from its current position.
*/
public function reactivateSubscription(SubscriptionId $subscriptionId, \Closure|null $progressCallback = null): Error|null
{
$subscriptionStatus = $this->subscriptionEngine->subscriptionStatus(SubscriptionEngineCriteria::create([$subscriptionId]))->first();
if ($subscriptionStatus === null) {
return new Error(sprintf('Subscription "%s" is not registered.', $subscriptionId->value));
}
if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) {
return new Error(sprintf('Subscription "%s" is not setup and cannot be reactivated.', $subscriptionId->value));
}
$reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE);
if ($reactivateResult->errors !== null) {
return self::createErrorForReason('Could not reactivate subscriber:', $reactivateResult->errors);
}
return null;
}

/**
* WARNING: Removes all events from the content repository and resets the subscriptions
* This operation cannot be undone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null,
);
}

public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null, int $batchSize = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(
fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ERROR, SubscriptionStatus::DETACHED]), $progressCallback, $batchSize)
);
}

public function reset(SubscriptionEngineCriteria|null $criteria = null): Result
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ public function statusCommand(string $contentRepository = 'default', bool $verbo
$contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory());
$crStatus = $contentRepositoryMaintainer->status();
$hasErrors = false;
$reactivationRequired = false;
$replayRequired = false;
$setupRequired = false;
$bootingRequired = false;
$this->outputLine('Event Store:');
$this->output(' Setup: ');
$this->outputLine(match ($crStatus->eventStoreStatus->type) {
Expand Down Expand Up @@ -148,9 +147,9 @@ public function statusCommand(string $contentRepository = 'default', bool $verbo
$this->outputLine(' at position <b>%d</b>', [$status->subscriptionPosition->value]);
}
$hasErrors |= $status->subscriptionStatus === SubscriptionStatus::ERROR;
$reactivationRequired |= $status->subscriptionStatus === SubscriptionStatus::ERROR;
$bootingRequired |= $status->subscriptionStatus === SubscriptionStatus::BOOTING;
$reactivationRequired |= $status->subscriptionStatus === SubscriptionStatus::DETACHED;
$replayRequired |= $status->subscriptionStatus === SubscriptionStatus::ERROR;
$replayRequired |= $status->subscriptionStatus === SubscriptionStatus::BOOTING;
$replayRequired |= $status->subscriptionStatus === SubscriptionStatus::DETACHED;
if ($verbose && $status->subscriptionError !== null) {
$lines = explode(chr(10), $status->subscriptionError->errorMessage ?: '<comment>No details available.</comment>');
foreach ($lines as $line) {
Expand All @@ -164,11 +163,8 @@ public function statusCommand(string $contentRepository = 'default', bool $verbo
if ($setupRequired) {
$this->outputLine('<comment>Setup required, please run <em>./flow cr:setup</em></comment>');
}
if ($bootingRequired) {
$this->outputLine('<comment>Replay needed for <comment>BOOTING</comment> projections, please run <em>./flow subscription:replay [subscription-id]</em></comment>');
}
if ($reactivationRequired) {
$this->outputLine('<comment>Reactivation of <comment>ERROR</comment> or <comment>DETACHED</comment> projection required, please run <em>./flow subscription:reactivate [subscription-id]</em></comment>');
if ($replayRequired) {
$this->outputLine('<comment>Replay needed for <comment>BOOTING</comment>, <comment>ERROR</comment> or <comment>DETACHED</comment> subscriptions, please run <em>./flow subscription:replay [subscription-id]</em></comment>');
}
}
if ($hasErrors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
*
* If any interaction is required "./flow cr:status" can be asked.
*
* *Replay*
* *Replay for initialisation*
*
* For initialising on a new database - which contains events already - a replay will make sure that the subscriptions
* are emptied and reapply the events. This can be triggered via "./flow subscription:replay --subscription contentGraph" or "./flow subscription:replayall"
*
* And after registering a new subscription a setup as well as a replay of this subscription is also required.
*
* *Reactivate*
* *Replay to repair*
*
* In case a subscription is detached but is reinstalled a reactivation is needed via "./flow subscription:reactivate --subscription contentGraph"
* In case a subscription is detached and then reinstalled a replay will make sure its caught up to all new events.
* And that the previous state will be reset as the projections logic might have changed.
*
* Also in case a subscription runs into the error status, its code needs to be fixed, and it can also be attempted to be reactivated.
* Also in case a subscription runs into the error status, its code needs to be fixed, and it can be attempted to be replayed.
*
* See also {@see ContentRepositoryMaintainer} for more information.
*/
Expand Down Expand Up @@ -137,44 +138,4 @@ public function replayAllCommand(string $contentRepository = 'default', bool $fo
$this->outputLine('<success>Done.</success>');
}
}

/**
* Reactivate a subscription
*
* The explicit catchup is only needed for projections in the error or detached status with an advanced position.
* Running a full replay would work but might be overkill, instead this reactivation will just attempt
* catchup the subscription back to active from its current position.
*
* @param string $subscription Identifier of the subscription to reactivate like it was configured (e.g. "contentGraph", "Vendor.Package:YourProjection")
* @param string $contentRepository Identifier of the Content Repository instance to operate on
* @param bool $quiet If set only fatal errors are rendered to the output (must be used with --force flag to avoid user input)
*/
public function reactivateCommand(string $subscription, string $contentRepository = 'default', bool $quiet = false): void
{
$contentRepositoryId = ContentRepositoryId::fromString($contentRepository);
$contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory());

$progressCallback = null;
if (!$quiet) {
$this->outputLine('Reactivate subscription "%s" of Content Repository "%s" ...', [$subscription, $contentRepositoryId->value]);
// render memory consumption and time remaining
$this->output->getProgressBar()->setFormat('debug');
$this->output->progressStart();
$progressCallback = fn () => $this->output->progressAdvance();
}

$result = $contentRepositoryMaintainer->reactivateSubscription(SubscriptionId::fromString($subscription), progressCallback: $progressCallback);

if (!$quiet) {
$this->output->progressFinish();
$this->outputLine();
}

if ($result !== null) {
$this->outputLine('<error>%s</error>', [$result->getMessage()]);
$this->quit(1);
} elseif (!$quiet) {
$this->outputLine('<success>Done.</success>');
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public function projectionInError(): void

// repair projection
$this->secondFakeProjection->killSaboteur();
$this->subscriptionController->reactivateCommand(subscription: 'Vendor.Package:SecondFakeProjection', contentRepository: $this->contentRepository->id->value, quiet: true);
$this->subscriptionController->replayCommand(subscription: 'Vendor.Package:SecondFakeProjection', contentRepository: $this->contentRepository->id->value, force: true, quiet: true);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2));
}
Expand Down

0 comments on commit 525fcc4

Please sign in to comment.