diff --git a/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/Behavior/CRBehavioralTestsSubjectProvider.php b/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/Behavior/CRBehavioralTestsSubjectProvider.php index 3fb475d02bd..2fb4fe8ac9b 100644 --- a/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/Behavior/CRBehavioralTestsSubjectProvider.php +++ b/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/Behavior/CRBehavioralTestsSubjectProvider.php @@ -18,12 +18,14 @@ use Behat\Gherkin\Node\TableNode; use Doctrine\DBAL\Connection; use Neos\ContentRepository\Core\ContentRepository; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; +use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine; use Neos\ContentRepository\TestSuite\Behavior\Features\Bootstrap\Helpers\GherkinTableNodeBasedContentDimensionSource; use Neos\ContentRepository\TestSuite\Fakes\FakeContentDimensionSourceFactory; use Neos\ContentRepository\TestSuite\Fakes\FakeNodeTypeManagerFactory; use Neos\EventStore\EventStoreInterface; +use PHPUnit\Framework\Assert; use Symfony\Component\Yaml\Yaml; /** @@ -179,20 +181,26 @@ protected function setUpContentRepository(ContentRepositoryId $contentRepository * Catch Up process and the testcase reset. */ $contentRepository = $this->createContentRepository($contentRepositoryId); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); if (!in_array($contentRepository->id, self::$alreadySetUpContentRepositories)) { - $subscriptionService->setupEventStore(); - $subscriptionService->subscriptionEngine->setup(); + $result = $contentRepositoryMaintainer->setUp(); + Assert::assertNull($result); self::$alreadySetUpContentRepositories[] = $contentRepository->id; } + // todo we TRUNCATE here and do not want to use $contentRepositoryMaintainer->prune(); here as it would not reset the autoincrement sequence number making some assertions impossible /** @var EventStoreInterface $eventStore */ $eventStore = (new \ReflectionClass($contentRepository))->getProperty('eventStore')->getValue($contentRepository); /** @var Connection $databaseConnection */ $databaseConnection = (new \ReflectionClass($eventStore))->getProperty('connection')->getValue($eventStore); $eventTableName = sprintf('cr_%s_events', $contentRepositoryId->value); $databaseConnection->executeStatement('TRUNCATE ' . $eventTableName); - $subscriptionService->subscriptionEngine->reset(); - $subscriptionService->subscriptionEngine->boot(); + + /** @var SubscriptionEngine $subscriptionEngine */ + $subscriptionEngine = (new \ReflectionClass($contentRepositoryMaintainer))->getProperty('subscriptionEngine')->getValue($contentRepositoryMaintainer); + $result = $subscriptionEngine->reset(); + Assert::assertNull($result->errors); + $result = $subscriptionEngine->boot(); + Assert::assertNull($result->errors); return $contentRepository; } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/AbstractSubscriptionEngineTestCase.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/AbstractSubscriptionEngineTestCase.php index 1602b362781..ca9c4de33eb 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/AbstractSubscriptionEngineTestCase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/AbstractSubscriptionEngineTestCase.php @@ -15,13 +15,12 @@ use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStateInterface; use Neos\ContentRepository\Core\Projection\ProjectionStatus; -use Neos\ContentRepository\Core\Service\SubscriptionService; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; +use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine; -use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; +use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepository\TestSuite\Fakes\FakeCatchUpHookFactory; @@ -37,12 +36,13 @@ use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +/** + * @internal, only for tests of the Neos.* namespace + */ abstract class AbstractSubscriptionEngineTestCase extends TestCase // we don't use Flows functional test case as it would reset the database afterwards { protected ContentRepository $contentRepository; - protected SubscriptionService $subscriptionService; - protected SubscriptionEngine $subscriptionEngine; protected EventStoreInterface $eventStore; @@ -102,8 +102,6 @@ final protected function setupContentRepositoryDependencies(ContentRepositoryId $contentRepositoryId ); - $this->subscriptionService = $this->getObject(ContentRepositoryRegistry::class)->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - $subscriptionEngineAndEventStoreAccessor = new class implements ContentRepositoryServiceFactoryInterface { public EventStoreInterface|null $eventStore; public SubscriptionEngine|null $subscriptionEngine; @@ -140,9 +138,9 @@ final protected function resetDatabase(Connection $connection, ContentRepository $connection->prepare('SET FOREIGN_KEY_CHECKS = 1;')->executeStatement(); } - final protected function subscriptionStatus(string $subscriptionId): ?SubscriptionAndProjectionStatus + final protected function subscriptionStatus(string $subscriptionId): ProjectionSubscriptionStatus|DetachedSubscriptionStatus|null { - return $this->subscriptionService->subscriptionEngine->subscriptionStatuses(SubscriptionCriteria::create(ids: [SubscriptionId::fromString($subscriptionId)]))->first(); + return $this->subscriptionEngine->subscriptionStatus(SubscriptionEngineCriteria::create(ids: [SubscriptionId::fromString($subscriptionId)]))->first(); } final protected function commitExampleContentStreamEvent(): void @@ -162,12 +160,12 @@ final protected function expectOkayStatus($subscriptionId, SubscriptionStatus $s { $actual = $this->subscriptionStatus($subscriptionId); self::assertEquals( - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString($subscriptionId), subscriptionStatus: $status, subscriptionPosition: $sequenceNumber, subscriptionError: null, - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ), $actual ); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php index 63b1bbc6806..3396ec98639 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php @@ -7,7 +7,7 @@ use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; @@ -18,11 +18,11 @@ final class CatchUpHookErrorTest extends AbstractSubscriptionEngineTestCase /** @test */ public function error_onBeforeEvent_projectionIsNotRun() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); // commit an event $this->commitExampleContentStreamEvent(); @@ -37,12 +37,12 @@ public function error_onBeforeEvent_projectionIsNotRun() $this->secondFakeProjection->injectSaboteur(fn () => self::fail('Projection apply is not expected to be called!')); - $expectedFailure = SubscriptionAndProjectionStatus::create( + $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ); self::assertEmpty( @@ -66,11 +66,11 @@ public function error_onBeforeEvent_projectionIsNotRun() /** @test */ public function error_onAfterEvent_projectionIsRolledBack() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); // commit an event $this->commitExampleContentStreamEvent(); @@ -83,12 +83,12 @@ public function error_onAfterEvent_projectionIsRolledBack() // TODO pass the error subscription status to onAfterCatchUp, so that in case of an error it can be prevented that mails f.x. will be sent? $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); - $expectedFailure = SubscriptionAndProjectionStatus::create( + $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ); self::assertEmpty( @@ -112,11 +112,11 @@ public function error_onAfterEvent_projectionIsRolledBack() /** @test */ public function error_onBeforeCatchUp_abortsCatchup() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::never())->method('apply'); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); @@ -158,11 +158,11 @@ public function error_onBeforeCatchUp_abortsCatchup() /** @test */ public function error_onAfterCatchUp_abortsCatchupAndRollBack() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php index 71fb6e8ca7c..56b9d632597 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php @@ -13,11 +13,11 @@ final class CatchUpHookTest extends AbstractSubscriptionEngineTestCase /** @test */ public function catchUpHooksAreExecutedAndCanAccessTheCorrectProjectionsState() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); // commit an event $this->commitExampleContentStreamEvent(); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php index 86550aae570..53ded4f33c3 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php @@ -12,7 +12,7 @@ use Neos\ContentRepository\Core\Subscription\Engine\Error; use Neos\ContentRepository\Core\Subscription\Engine\Errors; use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; @@ -25,11 +25,11 @@ final class ProjectionErrorTest extends AbstractSubscriptionEngineTestCase /** @test */ public function projectionWithError() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->subscriptionService->subscriptionEngine->setup(); + $this->subscriptionEngine->setup(); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::none()); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); @@ -41,15 +41,15 @@ public function projectionWithError() $this->fakeProjection->expects(self::once())->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willThrowException( $exception = new \RuntimeException('This projection is kaputt.') ); - $expectedStatusForFailedProjection = SubscriptionAndProjectionStatus::create( + $expectedStatusForFailedProjection = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::failed(1, Errors::fromArray([Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:FakeProjection'), $exception)])), $result); self::assertEquals( @@ -60,13 +60,13 @@ public function projectionWithError() // todo test retry if reimplemented: https://github.com/patchlevel/event-sourcing/blob/6826d533fd4762220f0397bc7afc589abb8c901b/src/Subscription/RetryStrategy/RetryStrategy.php // // CatchUp 2 with retry - // $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + // $result = $this->subscriptionEngine->catchUpActive(); // self::assertTrue($result->hasFailed()); // self::assertEquals($result->errors->first()->message, 'Something really wrong.'); // self::assertEquals($this->subscriptionStatus('Vendor.Package:FakeProjection')->subscriptionError->errorMessage, 'Something really wrong.'); // no retry, nothing to do. - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(0), $result); self::assertEquals($this->subscriptionStatus('Vendor.Package:FakeProjection')->subscriptionError->errorMessage, 'This projection is kaputt.'); self::assertEquals( @@ -78,11 +78,11 @@ public function projectionWithError() /** @test */ public function fixFailedProjection() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); // commit an event $this->commitExampleContentStreamEvent(); @@ -93,15 +93,15 @@ public function fixFailedProjection() null // okay again ); - $expectedFailure = SubscriptionAndProjectionStatus::create( + $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertTrue($result->hasFailed()); self::assertEquals( @@ -110,10 +110,10 @@ public function fixFailedProjection() ); // catchup active does not change anything - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(0), $result); // boot neither - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); // still the same state self::assertEquals( @@ -123,13 +123,13 @@ public function fixFailedProjection() $this->fakeProjection->expects(self::once())->method('resetState'); - $result = $this->subscriptionService->subscriptionEngine->reset(); + $result = $this->subscriptionEngine->reset(); self::assertNull($result->errors); // expect the subscriptionError to be reset to null $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); @@ -138,12 +138,12 @@ public function fixFailedProjection() /** @test */ public function projectionIsRolledBackAfterError() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::exactly(2))->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertNull($result->errors); // commit an event @@ -153,19 +153,19 @@ public function projectionIsRolledBackAfterError() $this->secondFakeProjection->injectSaboteur(fn () => throw $exception); - $expectedFailure = SubscriptionAndProjectionStatus::create( + $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ); self::assertEmpty( $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertSame($result->errors?->first()->message, 'This projection is kaputt.'); self::assertEquals( @@ -184,14 +184,14 @@ public function projectionIsRolledBackAfterError() $this->secondFakeProjection->killSaboteur(); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); // subscriptionError is reset $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); // catchup after fix - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertNull($result->errors); self::assertEquals( @@ -203,11 +203,11 @@ public function projectionIsRolledBackAfterError() /** @test */ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::exactly(2))->method('setUp'); $this->fakeProjection->expects(self::exactly(2))->method('apply'); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); // commit two events $this->commitExampleContentStreamEvent(); @@ -227,15 +227,15 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertTrue($result->hasFailed()); - $expectedFailure = SubscriptionAndProjectionStatus::create( + $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::fromInteger(1), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ); self::assertEquals( @@ -255,7 +255,7 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() $this->secondFakeProjection->killSaboteur(); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); // subscriptionError is reset, but the position is preserved @@ -266,7 +266,7 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() ); // catchup after fix - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertNull($result->errors); self::assertEquals( @@ -278,11 +278,11 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() /** @test */ public function projectionErrorWithMultipleProjectionsInContentRepositoryHandle() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->subscriptionEngine->setup(); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); $this->fakeProjection->expects(self::once())->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willThrowException( $originalException = new \RuntimeException('This projection is kaputt.'), diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionActiveStatusTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionActiveStatusTest.php index 2585852b998..421d9a4c212 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionActiveStatusTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionActiveStatusTest.php @@ -17,12 +17,12 @@ final class SubscriptionActiveStatusTest extends AbstractSubscriptionEngineTestC /** @test */ public function setupProjectionsAndCatchup() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->subscriptionService->subscriptionEngine->setup(); + $this->subscriptionEngine->setup(); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::none()); @@ -33,7 +33,7 @@ public function setupProjectionsAndCatchup() $this->commitExampleContentStreamEvent(); // subsequent catchup setup'd does not change the position - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::none()); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); @@ -41,7 +41,7 @@ public function setupProjectionsAndCatchup() // catchup active does apply the commited event $this->fakeProjection->expects(self::once())->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class)); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(1), $result); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); @@ -55,9 +55,9 @@ public function filteringCatchUpActive() $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); $result = $this->subscriptionEngine->boot(); self::assertNull($result->errors); @@ -85,13 +85,13 @@ public function filteringCatchUpActive() /** @test */ public function catchupWithNoEventsKeepsThePreviousPositionOfTheSubscribers() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->subscriptionEngine->setup(); + $this->subscriptionEngine->setup(); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::none()); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); @@ -101,13 +101,13 @@ public function catchupWithNoEventsKeepsThePreviousPositionOfTheSubscribers() // catchup active does apply the commited event $this->fakeProjection->expects(self::once())->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class)); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(1), $result); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); // empty catchup must keep the sequence numbers of the projections okay - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(0), $result); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBootingStatusTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBootingStatusTest.php index 01b15d9137c..a89370bd57e 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBootingStatusTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBootingStatusTest.php @@ -23,20 +23,20 @@ public function existingEventStoreEventsAreCaughtUpOnBoot() $this->commitExampleContentStreamEvent(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->subscriptionService->subscriptionEngine->setup(); + $this->subscriptionEngine->setup(); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); $this->expectOkayStatus('contentGraph', SubscriptionStatus::BOOTING, SequenceNumber::none()); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); $this->fakeProjection->expects(self::once())->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class)); - $this->subscriptionService->subscriptionEngine->boot(); + $this->subscriptionEngine->boot(); $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); // catchup is a noop because there are no unhandled events - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(0), $result); } @@ -46,9 +46,9 @@ public function filteringCatchUpBoot() $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php index 2a45ec60c4e..dabdcce9cac 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php @@ -5,8 +5,9 @@ namespace Neos\ContentRepository\BehavioralTests\Tests\Functional\Subscription; use Neos\ContentRepository\Core\Projection\ProjectionStatus; +use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; @@ -28,10 +29,10 @@ public function projectionIsDetachedOnCatchupActive() $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); - $this->subscriptionService->subscriptionEngine->setup(); + $this->eventStore->setup(); + $this->subscriptionEngine->setup(); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); @@ -45,8 +46,15 @@ public function projectionIsDetachedOnCatchupActive() $this->getObject(ContentRepositoryRegistry::class)->resetFactoryInstance($this->contentRepository->id); $this->setupContentRepositoryDependencies($this->contentRepository->id); - // todo status is stale??, should be DETACHED, and also cr:setup should marke detached projections?!! - // $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); + self::assertEquals( + DetachedSubscriptionStatus::create( + subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), + // the state is still active as we do not mutate it during the setup call! + subscriptionStatus: SubscriptionStatus::ACTIVE, + subscriptionPosition: SequenceNumber::none() + ), + $this->subscriptionStatus('Vendor.Package:FakeProjection') + ); $this->fakeProjection->expects(self::never())->method('apply'); // catchup to mark detached subscribers @@ -54,15 +62,12 @@ public function projectionIsDetachedOnCatchupActive() // todo result should reflect that there was an detachment? Throw error in CR? self::assertEquals(ProcessedResult::success(1), $result); - $expectedDetachedState = SubscriptionAndProjectionStatus::create( - subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), - subscriptionStatus: SubscriptionStatus::DETACHED, - subscriptionPosition: SequenceNumber::none(), - subscriptionError: null, - projectionStatus: null // not calculate-able at this point! - ); self::assertEquals( - $expectedDetachedState, + $expectedDetachedState = DetachedSubscriptionStatus::create( + subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), + subscriptionStatus: SubscriptionStatus::DETACHED, + subscriptionPosition: SequenceNumber::none() + ), $this->subscriptionStatus('Vendor.Package:FakeProjection') ); @@ -96,12 +101,12 @@ public function projectionIsDetachedOnSetupAndReattachedIfPossible() $this->fakeProjection->expects(self::once())->method('apply'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); - $this->subscriptionService->subscriptionEngine->setup(); + $this->eventStore->setup(); + $this->subscriptionEngine->setup(); $this->commitExampleContentStreamEvent(); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(1), $result); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); @@ -120,12 +125,10 @@ public function projectionIsDetachedOnSetupAndReattachedIfPossible() // todo result should reflect that there was an detachment? self::assertNull($result->errors); - $expectedDetachedState = SubscriptionAndProjectionStatus::create( + $expectedDetachedState = DetachedSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::DETACHED, - subscriptionPosition: SequenceNumber::fromInteger(1), - subscriptionError: null, - projectionStatus: null // not calculate-able at this point! + subscriptionPosition: SequenceNumber::fromInteger(1) ); self::assertEquals( $expectedDetachedState, @@ -147,12 +150,12 @@ public function projectionIsDetachedOnSetupAndReattachedIfPossible() $this->setupContentRepositoryDependencies($this->contentRepository->id); self::assertEquals( - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::DETACHED, subscriptionPosition: SequenceNumber::fromInteger(1), subscriptionError: null, - projectionStatus: ProjectionStatus::ok() // state _IS_ calculate-able at this point, todo better reflect meaning: is detached, but re-attachable! + setupStatus: ProjectionStatus::ok() // state _IS_ calculate-able at this point, todo better reflect meaning: is detached, but re-attachable! ), $this->subscriptionStatus('Vendor.Package:FakeProjection') ); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionGetStatusTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionGetStatusTest.php index 594c7264e92..0a624d4c08b 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionGetStatusTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionGetStatusTest.php @@ -6,12 +6,15 @@ use Doctrine\DBAL\Connection; use Neos\ContentRepository\Core\Projection\ProjectionStatus; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatuses; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; +use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; +use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\EventStore\Model\Event\SequenceNumber; +use Neos\EventStore\Model\EventStore\StatusType; final class SubscriptionGetStatusTest extends AbstractSubscriptionEngineTestCase { @@ -25,8 +28,13 @@ public function statusOnEmptyDatabase() keepSchema: false ); - $actualStatuses = $this->subscriptionService->subscriptionEngine->subscriptionStatuses(); - self::assertTrue($actualStatuses->isEmpty()); + $crMaintainer = $this->getObject(ContentRepositoryRegistry::class)->buildService($this->contentRepository->id, new ContentRepositoryMaintainerFactory()); + + $status = $crMaintainer->status(); + + self::assertEquals(StatusType::SETUP_REQUIRED, $status->eventStoreStatus->type); + self::assertNull($status->eventStorePosition); + self::assertTrue($status->subscriptionStatus->isEmpty()); self::assertNull( $this->subscriptionStatus('contentGraph') @@ -48,29 +56,29 @@ public function statusOnEmptyDatabase() $this->fakeProjection->expects(self::once())->method('status')->willReturn(ProjectionStatus::setupRequired('fake needs setup.')); - $actualStatuses = $this->subscriptionService->subscriptionEngine->subscriptionStatuses(); + $actualStatuses = $this->subscriptionEngine->subscriptionStatus(); - $expected = SubscriptionAndProjectionStatuses::fromArray([ - SubscriptionAndProjectionStatus::create( + $expected = SubscriptionStatusCollection::fromArray([ + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('contentGraph'), subscriptionStatus: SubscriptionStatus::BOOTING, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ), - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::NEW, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::setupRequired('fake needs setup.'), + setupStatus: ProjectionStatus::setupRequired('fake needs setup.'), ), - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::NEW, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements'), + setupStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements'), ), ]); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionNewStatusTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionNewStatusTest.php index fc8e61beddc..86c8ae2e0d5 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionNewStatusTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionNewStatusTest.php @@ -9,7 +9,7 @@ use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepository\TestSuite\Fakes\FakeProjectionFactory; @@ -32,10 +32,10 @@ public function newProjectionIsFoundWhenConfigurationIsAdded() $this->fakeProjection->expects(self::exactly(2))->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); - $this->subscriptionService->subscriptionEngine->setup(); + $this->eventStore->setup(); + $this->subscriptionEngine->setup(); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); self::assertNull($this->subscriptionStatus('Vendor.Package:NewFakeProjection')); @@ -43,7 +43,8 @@ public function newProjectionIsFoundWhenConfigurationIsAdded() $newFakeProjection = $this->getMockBuilder(ProjectionInterface::class)->disableAutoReturnValueGeneration()->getMock(); $newFakeProjection->method('getState')->willReturn(new class implements ProjectionStateInterface {}); - $newFakeProjection->expects(self::exactly(3))->method('status')->willReturnOnConsecutiveCalls( + $newFakeProjection->expects(self::exactly(4))->method('status')->willReturnOnConsecutiveCalls( + ProjectionStatus::setupRequired('Set me up'), ProjectionStatus::setupRequired('Set me up'), ProjectionStatus::ok(), ProjectionStatus::ok(), @@ -65,21 +66,26 @@ public function newProjectionIsFoundWhenConfigurationIsAdded() $this->getObject(ContentRepositoryRegistry::class)->resetFactoryInstance($this->contentRepository->id); $this->setupContentRepositoryDependencies($this->contentRepository->id); - // todo status doesnt find this projection yet? - self::assertNull($this->subscriptionStatus('Vendor.Package:NewFakeProjection')); + $expectedNewState = ProjectionSubscriptionStatus::create( + subscriptionId: SubscriptionId::fromString('Vendor.Package:NewFakeProjection'), + subscriptionStatus: SubscriptionStatus::NEW, + subscriptionPosition: SequenceNumber::none(), + subscriptionError: null, + setupStatus: ProjectionStatus::setupRequired('Set me up') + ); + + // status predicts the NEW state already (without creating it in the db) + self::assertEquals( + $expectedNewState, + $this->subscriptionStatus('Vendor.Package:NewFakeProjection') + ); // do something that finds new subscriptions, trigger a setup on a specific projection: $result = $this->subscriptionEngine->setup(SubscriptionEngineCriteria::create([SubscriptionId::fromString('contentGraph')])); self::assertNull($result->errors); self::assertEquals( - SubscriptionAndProjectionStatus::create( - subscriptionId: SubscriptionId::fromString('Vendor.Package:NewFakeProjection'), - subscriptionStatus: SubscriptionStatus::NEW, - subscriptionPosition: SequenceNumber::none(), - subscriptionError: null, - projectionStatus: ProjectionStatus::setupRequired('Set me up') - ), + $expectedNewState, $this->subscriptionStatus('Vendor.Package:NewFakeProjection') ); diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionResetTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionResetTest.php index ea1a741ad57..cbebb621ceb 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionResetTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionResetTest.php @@ -18,9 +18,9 @@ public function filteringReset() $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); self::assertEmpty( $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionSetupTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionSetupTest.php index 1483e90851f..dd021dc8183 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionSetupTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionSetupTest.php @@ -6,8 +6,8 @@ use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatuses; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; +use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; @@ -18,35 +18,35 @@ final class SubscriptionSetupTest extends AbstractSubscriptionEngineTestCase /** @test */ public function setupOnEmptyDatabase() { - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->subscriptionService->subscriptionEngine->setup(); + $this->subscriptionEngine->setup(); $this->fakeProjection->expects(self::exactly(2))->method('status')->willReturn(ProjectionStatus::ok()); - $actualStatuses = $this->subscriptionService->subscriptionEngine->subscriptionStatuses(); + $actualStatuses = $this->subscriptionEngine->subscriptionStatus(); - $expected = SubscriptionAndProjectionStatuses::fromArray([ - SubscriptionAndProjectionStatus::create( + $expected = SubscriptionStatusCollection::fromArray([ + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('contentGraph'), subscriptionStatus: SubscriptionStatus::BOOTING, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ), - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::BOOTING, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ), - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::BOOTING, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::ok(), + setupStatus: ProjectionStatus::ok(), ), ]); @@ -67,22 +67,22 @@ public function filteringSetup() $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('status')->willReturn(ProjectionStatus::ok()); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); $filter = SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:FakeProjection')]); - $result = $this->subscriptionService->subscriptionEngine->setup($filter); + $result = $this->subscriptionEngine->setup($filter); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); self::assertEquals( - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::NEW, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::ok() + setupStatus: ProjectionStatus::ok() ), $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') ); @@ -97,11 +97,11 @@ public function setupIsInvokedForBootingSubscribers() // hard reset, so that the tests actually need sql migrations $this->secondFakeProjection->dropTables(); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); // initial setup for FakeProjection - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); @@ -110,17 +110,17 @@ public function setupIsInvokedForBootingSubscribers() $this->secondFakeProjection->schemaNeedsAdditionalColumn('column_after_update'); self::assertEquals( - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::BOOTING, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements') + setupStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements') ), $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') ); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); @@ -138,35 +138,35 @@ public function setupIsInvokedForPreviouslyActiveSubscribers() // hard reset, so that the tests actually need sql migrations $this->secondFakeProjection->dropTables(); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); // setup subscription tables - $result = $this->subscriptionService->subscriptionEngine->setup(SubscriptionEngineCriteria::create([SubscriptionId::fromString('contentGraph')])); + $result = $this->subscriptionEngine->setup(SubscriptionEngineCriteria::create([SubscriptionId::fromString('contentGraph')])); self::assertNull($result->errors); self::assertEquals( - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::NEW, subscriptionPosition: SequenceNumber::none(), subscriptionError: null, - projectionStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements') + setupStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements') ), $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') ); // initial setup for FakeProjection - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); - $result = $this->subscriptionService->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->boot(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); // regular work $this->commitExampleContentStreamEvent(); - $result = $this->subscriptionService->subscriptionEngine->catchUpActive(); + $result = $this->subscriptionEngine->catchUpActive(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); @@ -176,17 +176,17 @@ public function setupIsInvokedForPreviouslyActiveSubscribers() $this->secondFakeProjection->schemaNeedsAdditionalColumn('column_after_update'); self::assertEquals( - SubscriptionAndProjectionStatus::create( + ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::ACTIVE, subscriptionPosition: SequenceNumber::fromInteger(1), subscriptionError: null, - projectionStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements') + setupStatus: ProjectionStatus::setupRequired('Requires 1 SQL statements') ), $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') ); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); @@ -200,17 +200,17 @@ public function failingSetupWillMarkProjectionAsErrored() ); $this->fakeProjection->expects(self::once())->method('status')->willReturn(ProjectionStatus::setupRequired('Needs setup')); - $this->subscriptionService->setupEventStore(); + $this->eventStore->setup(); - $result = $this->subscriptionService->subscriptionEngine->setup(); + $result = $this->subscriptionEngine->setup(); self::assertSame('Projection could not be setup', $result->errors?->first()->message); - $expectedFailure = SubscriptionAndProjectionStatus::create( + $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::NEW, $exception), - projectionStatus: ProjectionStatus::setupRequired('Needs setup'), + setupStatus: ProjectionStatus::setupRequired('Needs setup'), ); self::assertEquals( diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php index d4646e8165e..efa68008845 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Parallel/AbstractParallelTestCase.php @@ -14,10 +14,9 @@ namespace Neos\ContentRepository\BehavioralTests\Tests\Parallel; -use Doctrine\DBAL\Connection; use Neos\ContentRepository\Core\ContentRepository; -use Neos\ContentRepository\Core\Service\SubscriptionService; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\Flow\Core\Bootstrap; @@ -71,19 +70,11 @@ final protected function setUpContentRepository( ContentRepositoryId $contentRepositoryId ): ContentRepository { $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); - /** @var SubscriptionService $subscriptionService */ - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - $subscriptionService->setupEventStore(); - $subscriptionService->subscriptionEngine->setup(); - - $connection = $this->objectManager->get(Connection::class); - + /** @var ContentRepositoryMaintainer $contentRepositoryMaintainer */ + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + $contentRepositoryMaintainer->setUp(); // reset events and projections - $eventTableName = sprintf('cr_%s_events', $contentRepositoryId->value); - $connection->executeStatement('TRUNCATE ' . $eventTableName); - - $subscriptionService->subscriptionEngine->reset(); - $subscriptionService->subscriptionEngine->boot(); + $contentRepositoryMaintainer->prune(); return $contentRepository; } diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php index 7e195ff4644..aff57389895 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionInterface.php @@ -26,7 +26,7 @@ interface ProjectionInterface public function setUp(): void; /** - * Determines the status of the projection (not to confuse with {@see getState()}) + * Determines the setup status of the projection. E.g. are the database tables created or any columns missing. */ public function status(): ProjectionStatus; diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatus.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatus.php index 517c53d7a7c..17ddc848176 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatus.php +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatus.php @@ -5,6 +5,10 @@ namespace Neos\ContentRepository\Core\Projection; /** + * The setup status of a projection. + * + * E.g. are the database tables created or any columns missing. + * * @api */ final readonly class ProjectionStatus @@ -35,20 +39,4 @@ public static function setupRequired(string $details): self { return new self(ProjectionStatusType::SETUP_REQUIRED, $details); } - - /** - * @param non-empty-string $details - */ - public static function replayRequired(string $details): self - { - return new self(ProjectionStatusType::REPLAY_REQUIRED, $details); - } - - /** - * @param non-empty-string $details - */ - public function withDetails(string $details): self - { - return new self($this->type, $details); - } } diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatusType.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatusType.php index 48681927742..285ad8e86d4 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatusType.php +++ b/Neos.ContentRepository.Core/Classes/Projection/ProjectionStatusType.php @@ -9,8 +9,17 @@ */ enum ProjectionStatusType { + /** + * No actions needed + */ case OK; - case ERROR; + /** + * The projection needs to be setup to adjust its schema + * {@see \Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer::setUp()} + */ case SETUP_REQUIRED; - case REPLAY_REQUIRED; + /** + * An error occurred while determining the status (e.g. connection is closed) + */ + case ERROR; } diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php new file mode 100644 index 00000000000..4adb2301ca9 --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php @@ -0,0 +1,249 @@ +eventStore->setup(); + $eventStoreIsEmpty = iterator_count($this->eventStore->load(VirtualStreamName::all())->limit(1)) === 0; + $setupResult = $this->subscriptionEngine->setup(); + if ($setupResult->errors !== null) { + return self::createErrorForReason('setup', $setupResult->errors); + } + if ($eventStoreIsEmpty) { + // note: possibly introduce $skipBooting flag instead + // see https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L42 + $bootResult = $this->subscriptionEngine->boot(); + if ($bootResult->errors !== null) { + return self::createErrorForReason('initial catchup', $bootResult->errors); + } + } + return null; + } + + public function status(): ContentRepositoryStatus + { + try { + $lastEventEnvelope = current(iterator_to_array($this->eventStore->load(VirtualStreamName::all())->backwards()->limit(1))) ?: null; + $sequenceNumber = $lastEventEnvelope?->sequenceNumber ?? SequenceNumber::none(); + } catch (DBALException) { + $sequenceNumber = null; + } + + return ContentRepositoryStatus::create( + $this->eventStore->status(), + $sequenceNumber, + $this->subscriptionEngine->subscriptionStatus() + ); + } + + public function replaySubscription(SubscriptionId $subscriptionId, \Closure|null $progressCallback = null): Error|null + { + $subscriptionStatus = $this->subscriptionEngine->subscriptionStatus(SubscriptionEngineCriteria::create([$subscriptionId]))->first(); + if ($subscriptionStatus === null) { + return new Error(sprintf('Subscription "%s" is not registered.', $subscriptionId->value)); + } + if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) { + return new Error(sprintf('Subscription "%s" is not setup and cannot be replayed.', $subscriptionId->value)); + } + $resetResult = $this->subscriptionEngine->reset(SubscriptionEngineCriteria::create([$subscriptionId])); + if ($resetResult->errors !== null) { + return self::createErrorForReason('reset', $resetResult->errors); + } + $bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback); + if ($bootResult->errors !== null) { + return self::createErrorForReason('catchup', $bootResult->errors); + } + return null; + } + + public function replayAllSubscriptions(\Closure|null $progressCallback = null): Error|null + { + $resetResult = $this->subscriptionEngine->reset(); + if ($resetResult->errors !== null) { + return self::createErrorForReason('reset', $resetResult->errors); + } + $bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback); + if ($bootResult->errors !== null) { + return self::createErrorForReason('catchup', $bootResult->errors); + } + return null; + } + + /** + * Reactivate a subscription + * + * The explicit catchup is only needed for subscriptions in the error or detached status with an advanced position. + * Running a full replay would work but might be overkill, instead this reactivation will just attempt + * catchup the subscription back to active from its current position. + */ + public function reactivateSubscription(SubscriptionId $subscriptionId, \Closure|null $progressCallback = null): Error|null + { + $subscriptionStatus = $this->subscriptionEngine->subscriptionStatus(SubscriptionEngineCriteria::create([$subscriptionId]))->first(); + if ($subscriptionStatus === null) { + return new Error(sprintf('Subscription "%s" is not registered.', $subscriptionId->value)); + } + if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) { + return new Error(sprintf('Subscription "%s" is not setup and cannot be reactivated.', $subscriptionId->value)); + } + + // todo implement https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L624 + return null; + } + + /** + * WARNING: Removes all events from the content repository and resets the subscriptions + * This operation cannot be undone. + */ + public function prune(): Error|null + { + // prune all streams: + foreach ($this->findAllContentStreamStreamNames() as $contentStreamStreamName) { + $this->eventStore->deleteStream($contentStreamStreamName); + } + foreach ($this->findAllWorkspaceStreamNames() as $workspaceStreamName) { + $this->eventStore->deleteStream($workspaceStreamName); + } + $resetResult = $this->subscriptionEngine->reset(); + if ($resetResult->errors !== null) { + return self::createErrorForReason('reset', $resetResult->errors); + } + // note: possibly introduce $skipBooting flag like for setup + $bootResult = $this->subscriptionEngine->boot(); + if ($bootResult->errors !== null) { + return self::createErrorForReason('booting', $bootResult->errors); + } + return null; + } + + private static function createErrorForReason(string $method, Errors $errors): Error + { + $message = []; + $message[] = sprintf('%s produced the following error%s', $method, $errors->count() === 1 ? '' : 's'); + foreach ($errors as $error) { + $message[] = sprintf(' Subscription "%s": %s', $error->subscriptionId->value, $error->message); + } + return new Error(join("\n", $message)); + } + + /** + * @return list + */ + private function findAllContentStreamStreamNames(): array + { + $events = $this->eventStore->load( + VirtualStreamName::forCategory(ContentStreamEventStreamName::EVENT_STREAM_NAME_PREFIX), + EventStreamFilter::create( + EventTypes::create( + // we are only interested in the creation events to limit the amount of events to fetch + EventType::fromString('ContentStreamWasCreated'), + EventType::fromString('ContentStreamWasForked') + ) + ) + ); + $allStreamNames = []; + foreach ($events as $eventEnvelope) { + $allStreamNames[] = $eventEnvelope->streamName; + } + return array_unique($allStreamNames, SORT_REGULAR); + } + + /** + * @return list + */ + private function findAllWorkspaceStreamNames(): array + { + $events = $this->eventStore->load( + VirtualStreamName::forCategory(WorkspaceEventStreamName::EVENT_STREAM_NAME_PREFIX), + EventStreamFilter::create( + EventTypes::create( + // we are only interested in the creation events to limit the amount of events to fetch + EventType::fromString('RootWorkspaceWasCreated'), + EventType::fromString('WorkspaceWasCreated') + ) + ) + ); + $allStreamNames = []; + foreach ($events as $eventEnvelope) { + $allStreamNames[] = $eventEnvelope->streamName; + } + return array_unique($allStreamNames, SORT_REGULAR); + } +} diff --git a/Neos.ContentRepository.Core/Classes/Service/SubscriptionServiceFactory.php b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainerFactory.php similarity index 57% rename from Neos.ContentRepository.Core/Classes/Service/SubscriptionServiceFactory.php rename to Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainerFactory.php index 3fe17594800..234b01fcb1d 100644 --- a/Neos.ContentRepository.Core/Classes/Service/SubscriptionServiceFactory.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainerFactory.php @@ -8,17 +8,17 @@ use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceFactoryInterface; /** - * @implements ContentRepositoryServiceFactoryInterface + * @implements ContentRepositoryServiceFactoryInterface * @api */ -class SubscriptionServiceFactory implements ContentRepositoryServiceFactoryInterface +class ContentRepositoryMaintainerFactory implements ContentRepositoryServiceFactoryInterface { public function build( ContentRepositoryServiceFactoryDependencies $serviceFactoryDependencies - ): SubscriptionService { - return new SubscriptionService( + ): ContentRepositoryMaintainer { + return new ContentRepositoryMaintainer( $serviceFactoryDependencies->eventStore, - $serviceFactoryDependencies->subscriptionEngine, + $serviceFactoryDependencies->subscriptionEngine ); } } diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php index a71f8a4d7d2..5f1991c0a77 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php @@ -4,7 +4,6 @@ namespace Neos\ContentRepository\Core\Service; -use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\EventStore\EventNormalizer; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface; use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; @@ -27,7 +26,6 @@ use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\EventType; use Neos\EventStore\Model\Event\EventTypes; -use Neos\EventStore\Model\Event\StreamName; use Neos\EventStore\Model\EventStream\EventStreamFilter; use Neos\EventStore\Model\EventStream\ExpectedVersion; use Neos\EventStore\Model\EventStream\VirtualStreamName; @@ -200,15 +198,6 @@ public function pruneRemovedFromEventStream(\Closure $outputFn): void } } - public function pruneAllWorkspacesAndContentStreamsFromEventStream(): void - { - foreach ($this->findAllContentStreamStreamNames() as $contentStreamStreamName) { - $this->eventStore->deleteStream($contentStreamStreamName); - } - foreach ($this->findAllWorkspaceStreamNames() as $workspaceStreamName) { - $this->eventStore->deleteStream($workspaceStreamName); - } - } /** * Find all removed content streams that are unused in the event stream @@ -411,48 +400,4 @@ private function findAllContentStreams(): array } return $cs; } - - /** - * @return list - */ - private function findAllContentStreamStreamNames(): array - { - $events = $this->eventStore->load( - VirtualStreamName::forCategory(ContentStreamEventStreamName::EVENT_STREAM_NAME_PREFIX), - EventStreamFilter::create( - EventTypes::create( - // we are only interested in the creation events to limit the amount of events to fetch - EventType::fromString('ContentStreamWasCreated'), - EventType::fromString('ContentStreamWasForked') - ) - ) - ); - $allStreamNames = []; - foreach ($events as $eventEnvelope) { - $allStreamNames[] = $eventEnvelope->streamName; - } - return array_unique($allStreamNames, SORT_REGULAR); - } - - /** - * @return list - */ - private function findAllWorkspaceStreamNames(): array - { - $events = $this->eventStore->load( - VirtualStreamName::forCategory(WorkspaceEventStreamName::EVENT_STREAM_NAME_PREFIX), - EventStreamFilter::create( - EventTypes::create( - // we are only interested in the creation events to limit the amount of events to fetch - EventType::fromString('RootWorkspaceWasCreated'), - EventType::fromString('WorkspaceWasCreated') - ) - ) - ); - $allStreamNames = []; - foreach ($events as $eventEnvelope) { - $allStreamNames[] = $eventEnvelope->streamName; - } - return array_unique($allStreamNames, SORT_REGULAR); - } } diff --git a/Neos.ContentRepository.Core/Classes/Service/SubscriptionService.php b/Neos.ContentRepository.Core/Classes/Service/SubscriptionService.php deleted file mode 100644 index 321df1534ff..00000000000 --- a/Neos.ContentRepository.Core/Classes/Service/SubscriptionService.php +++ /dev/null @@ -1,32 +0,0 @@ -eventStore->setup(); - } - - public function eventStoreStatus(): Status - { - return $this->eventStore->status(); - } -} diff --git a/Neos.ContentRepository.Core/Classes/SharedModel/ContentRepository/ContentRepositoryStatus.php b/Neos.ContentRepository.Core/Classes/SharedModel/ContentRepository/ContentRepositoryStatus.php new file mode 100644 index 00000000000..db546bf9f9f --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/SharedModel/ContentRepository/ContentRepositoryStatus.php @@ -0,0 +1,55 @@ + - * @api + * @internal implementation detail of the catchup */ final readonly class Errors implements \IteratorAggregate, \Countable { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php index 61f1a07e840..e90e6975fa2 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php @@ -5,7 +5,7 @@ namespace Neos\ContentRepository\Core\Subscription\Engine; /** - * @api + * @internal implementation detail of the catchup */ final readonly class ProcessedResult { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/Result.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/Result.php index dc8da2ef08b..dab71033f97 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/Result.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/Result.php @@ -5,7 +5,7 @@ namespace Neos\ContentRepository\Core\Subscription\Engine; /** - * @api + * @internal implementation detail of the catchup */ final readonly class Result { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index 0ad1265502c..7f8e1f43aa1 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -5,12 +5,15 @@ namespace Neos\ContentRepository\Core\Subscription\Engine; use Doctrine\DBAL\Exception\TableNotFoundException; +use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer; +use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed; use Neos\ContentRepository\Core\Subscription\Exception\SubscriptionEngineAlreadyProcessingException; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus; -use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatuses; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; +use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusFilter; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\SequenceNumber; @@ -25,7 +28,12 @@ use Neos\ContentRepository\Core\Subscription\Subscriptions; /** - * @api + * This is the internal core for the catchup + * + * All functionality is low level and well encapsulated and abstracted by the {@see ContentRepositoryMaintainer} + * It presents the only API way to interact with catchup and offers more maintenance tasks. + * + * @internal implementation detail of the catchup. See {@see ContentRepository::handle()} and {@see ContentRepositoryMaintainer} */ final class SubscriptionEngine { @@ -103,26 +111,51 @@ public function reset(SubscriptionEngineCriteria|null $criteria = null): Result return $errors === [] ? Result::success() : Result::failed(Errors::fromArray($errors)); } - public function subscriptionStatuses(SubscriptionCriteria|null $criteria = null): SubscriptionAndProjectionStatuses + public function subscriptionStatus(SubscriptionEngineCriteria|null $criteria = null): SubscriptionStatusCollection { $statuses = []; try { - $subscriptions = $this->subscriptionStore->findByCriteria($criteria ?? SubscriptionCriteria::noConstraints()); + $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::create(ids: $criteria?->ids)); } catch (TableNotFoundException) { // the schema is not setup - thus there are no subscribers - return SubscriptionAndProjectionStatuses::createEmpty(); + return SubscriptionStatusCollection::createEmpty(); } foreach ($subscriptions as $subscription) { - $subscriber = $this->subscribers->contain($subscription->id) ? $this->subscribers->get($subscription->id) : null; - $statuses[] = SubscriptionAndProjectionStatus::create( + if (!$this->subscribers->contain($subscription->id)) { + $statuses[] = DetachedSubscriptionStatus::create( + $subscription->id, + $subscription->status, + $subscription->position + ); + continue; + } + $subscriber = $this->subscribers->get($subscription->id); + $statuses[] = ProjectionSubscriptionStatus::create( subscriptionId: $subscription->id, subscriptionStatus: $subscription->status, subscriptionPosition: $subscription->position, subscriptionError: $subscription->error, - projectionStatus: $subscriber?->projection->status(), + setupStatus: $subscriber->projection->status(), + ); + } + foreach ($this->subscribers as $subscriber) { + if ($subscriptions->contain($subscriber->id)) { + continue; + } + if ($criteria?->ids?->contain($subscriber->id) === false) { + // this might be a NEW subscription but we dont return it as status is filtered. + continue; + } + // this NEW state is not persisted yet + $statuses[] = ProjectionSubscriptionStatus::create( + subscriptionId: $subscriber->id, + subscriptionStatus: SubscriptionStatus::NEW, + subscriptionPosition: SequenceNumber::none(), + subscriptionError: null, + setupStatus: $subscriber->projection->status(), ); } - return SubscriptionAndProjectionStatuses::fromArray($statuses); + return SubscriptionStatusCollection::fromArray($statuses); } private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domainEvent, Subscription $subscription): Error|null @@ -345,7 +378,7 @@ function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosu private function processExclusively(\Closure $closure): mixed { if ($this->processing) { - throw new SubscriptionEngineAlreadyProcessingException(); + throw new SubscriptionEngineAlreadyProcessingException('Subscription engine is already processing', 1732714075); } $this->processing = true; try { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngineCriteria.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngineCriteria.php index 3cb3946aa17..068f5a15a98 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngineCriteria.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngineCriteria.php @@ -8,7 +8,7 @@ use Neos\ContentRepository\Core\Subscription\SubscriptionIds; /** - * @internal + * @internal implementation detail of the catchup */ final class SubscriptionEngineCriteria { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php index 4fd33df2388..2daa7e9db3f 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php @@ -9,7 +9,9 @@ use Neos\ContentRepository\Core\Subscription\Subscription; use Neos\ContentRepository\Core\Subscription\Subscriptions; -/** @internal */ +/** + * @internal implementation detail of the catchup + */ final class SubscriptionManager { /** @var \SplObjectStorage */ diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php b/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php index c18aaf9c3ba..2cee008bda8 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php @@ -6,6 +6,7 @@ /** * Only thrown if there is no way to recover the started catchup. The transaction will be rolled back. + * * @api */ final class CatchUpFailed extends \RuntimeException diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Exception/SubscriptionEngineAlreadyProcessingException.php b/Neos.ContentRepository.Core/Classes/Subscription/Exception/SubscriptionEngineAlreadyProcessingException.php index 9631724adfb..4747a062f31 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Exception/SubscriptionEngineAlreadyProcessingException.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Exception/SubscriptionEngineAlreadyProcessingException.php @@ -9,11 +9,4 @@ */ final class SubscriptionEngineAlreadyProcessingException extends \RuntimeException { - /** - * @internal - */ - public function __construct() - { - parent::__construct('Subscription engine is already processing'); - } } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionAndProjectionStatus.php b/Neos.ContentRepository.Core/Classes/Subscription/ProjectionSubscriptionStatus.php similarity index 72% rename from Neos.ContentRepository.Core/Classes/Subscription/SubscriptionAndProjectionStatus.php rename to Neos.ContentRepository.Core/Classes/Subscription/ProjectionSubscriptionStatus.php index 56fca0ac88a..3e443da6171 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionAndProjectionStatus.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/ProjectionSubscriptionStatus.php @@ -8,26 +8,29 @@ use Neos\EventStore\Model\Event\SequenceNumber; /** - * @api + * @api part of the subscription status */ -final readonly class SubscriptionAndProjectionStatus +final readonly class ProjectionSubscriptionStatus { private function __construct( public SubscriptionId $subscriptionId, public SubscriptionStatus $subscriptionStatus, public SequenceNumber $subscriptionPosition, public SubscriptionError|null $subscriptionError, - public ProjectionStatus|null $projectionStatus, + public ProjectionStatus $setupStatus, ) { } + /** + * @internal implementation detail of the catchup + */ public static function create( SubscriptionId $subscriptionId, SubscriptionStatus $subscriptionStatus, SequenceNumber $subscriptionPosition, SubscriptionError|null $subscriptionError, - ProjectionStatus|null $projectionStatus + ProjectionStatus $setupStatus ): self { - return new self($subscriptionId, $subscriptionStatus, $subscriptionPosition, $subscriptionError, $projectionStatus); + return new self($subscriptionId, $subscriptionStatus, $subscriptionPosition, $subscriptionError, $setupStatus); } } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionCriteria.php b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionCriteria.php index 8eca4a8ed79..5bb7c30b2ea 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionCriteria.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionCriteria.php @@ -11,7 +11,7 @@ use Neos\ContentRepository\Core\Subscription\SubscriptionStatusFilter; /** - * @api + * @internal implementation detail of the catchup */ final readonly class SubscriptionCriteria { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php index 4b6a827f8e6..0127769fde8 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php @@ -8,7 +8,7 @@ use Neos\ContentRepository\Core\Subscription\Subscriptions; /** - * @api + * @internal only API for custom content repository integrations */ interface SubscriptionStoreInterface { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php index 117b52265bc..7e15a8fdcba 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/ProjectionSubscriber.php @@ -13,7 +13,7 @@ use Neos\EventStore\Model\EventEnvelope; /** - * @internal + * @internal implementation detail of the catchup */ final class ProjectionSubscriber { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php index ba40fbddb1a..eba25e39a1a 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscriber/Subscribers.php @@ -7,8 +7,14 @@ use Neos\ContentRepository\Core\Subscription\SubscriptionId; /** + * A collection of the registered subscribers. + * + * Currently only projections are the available subscribers, but when the concept is extended, + * other *Subscriber value objects will also be hold in this set. + * Like a possible "ListeningSubscriber" to only listen to events without the capabilities of a full-blown projection. + * * @implements \IteratorAggregate - * @internal + * @internal implementation detail of the catchup */ final class Subscribers implements \IteratorAggregate, \Countable, \JsonSerializable { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php index 7a6c22ab82f..902f7210db7 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php @@ -11,7 +11,7 @@ /** * Note: This class is mutable by design! * - * @internal + * @internal implementation detail of the catchup */ final class Subscription { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionAndProjectionStatuses.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionAndProjectionStatuses.php deleted file mode 100644 index 47c237813b5..00000000000 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionAndProjectionStatuses.php +++ /dev/null @@ -1,54 +0,0 @@ - - */ -final readonly class SubscriptionAndProjectionStatuses implements \IteratorAggregate -{ - /** - * @var array $statuses - */ - private array $statuses; - - private function __construct( - SubscriptionAndProjectionStatus ...$statuses, - ) { - $this->statuses = $statuses; - } - - public static function createEmpty(): self - { - return new self(); - } - - /** - * @param array $statuses - */ - public static function fromArray(array $statuses): self - { - return new self(...$statuses); - } - - public function first(): ?SubscriptionAndProjectionStatus - { - foreach ($this->statuses as $status) { - return $status; - } - return null; - } - - public function getIterator(): \Traversable - { - yield from $this->statuses; - } - - public function isEmpty(): bool - { - return $this->statuses === []; - } -} diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionError.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionError.php index 5c53135dc66..1adc7e2cfe1 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionError.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionError.php @@ -5,7 +5,7 @@ namespace Neos\ContentRepository\Core\Subscription; /** - * @api + * @api part of the subscription status */ final class SubscriptionError { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionId.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionId.php index 777814c275e..668c85f0b78 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionId.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionId.php @@ -5,7 +5,7 @@ namespace Neos\ContentRepository\Core\Subscription; /** - * @api + * @api identifier for a registered subscription */ final class SubscriptionId { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionIds.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionIds.php index 7b672d887ec..9e81ae56ef2 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionIds.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionIds.php @@ -6,7 +6,7 @@ /** * @implements \IteratorAggregate - * @api + * @internal implementation detail of the catchup */ final class SubscriptionIds implements \IteratorAggregate, \Countable, \JsonSerializable { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatus.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatus.php index f1b29c7e572..487f742fbab 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatus.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatus.php @@ -5,7 +5,7 @@ namespace Neos\ContentRepository\Core\Subscription; /** - * @api + * @api part of the subscription status */ enum SubscriptionStatus : string { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusCollection.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusCollection.php new file mode 100644 index 00000000000..4ee36d68ded --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusCollection.php @@ -0,0 +1,63 @@ + + */ +final readonly class SubscriptionStatusCollection implements \IteratorAggregate +{ + /** + * @var array $items + */ + private array $items; + + private function __construct( + ProjectionSubscriptionStatus|DetachedSubscriptionStatus ...$items, + ) { + $this->items = $items; + } + + public static function createEmpty(): self + { + return new self(); + } + + /** + * @param array $items + */ + public static function fromArray(array $items): self + { + return new self(...$items); + } + + public function first(): ProjectionSubscriptionStatus|DetachedSubscriptionStatus|null + { + foreach ($this->items as $status) { + return $status; + } + return null; + } + + public function getIterator(): \Traversable + { + yield from $this->items; + } + + public function isEmpty(): bool + { + return $this->items === []; + } +} diff --git a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusFilter.php b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusFilter.php index adc3fa4e51b..e531c180329 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusFilter.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/SubscriptionStatusFilter.php @@ -6,7 +6,7 @@ /** * @implements \IteratorAggregate - * @api + * @internal implementation detail of the catchup */ final class SubscriptionStatusFilter implements \IteratorAggregate { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscriptions.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscriptions.php index 03a09abe0f1..3bba94d0018 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscriptions.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscriptions.php @@ -8,7 +8,7 @@ /** * @implements \IteratorAggregate - * @api + * @internal implementation detail of the catchup */ final class Subscriptions implements \IteratorAggregate, \Countable, \JsonSerializable { diff --git a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/CRTestSuiteTrait.php b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/CRTestSuiteTrait.php index f65d0de69fc..c06fa32af4c 100644 --- a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/CRTestSuiteTrait.php +++ b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/CRTestSuiteTrait.php @@ -27,10 +27,12 @@ use Neos\ContentRepository\Core\Projection\ContentGraph\Filter\NodeType\NodeTypeCriteria; use Neos\ContentRepository\Core\Projection\ContentGraph\Subtree; use Neos\ContentRepository\Core\Projection\ContentGraph\VisibilityConstraints; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\Service\ContentStreamPrunerFactory; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; +use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\TestSuite\Behavior\Features\Bootstrap\Features\ContentStreamClosing; use Neos\ContentRepository\TestSuite\Behavior\Features\Bootstrap\Features\NodeCreation; use Neos\ContentRepository\TestSuite\Behavior\Features\Bootstrap\Features\NodeModification; @@ -254,8 +256,9 @@ abstract protected function getContentRepositoryService( */ public function iReplayTheProjection(string $projectionName): void { - $this->currentContentRepository->resetProjectionState($projectionName); - $this->currentContentRepository->catchUpProjection($projectionName, CatchUpOptions::create()); + $contentRepositoryMaintainer = $this->getContentRepositoryService(new ContentRepositoryMaintainerFactory()); + $result = $contentRepositoryMaintainer->replaySubscription(SubscriptionId::fromString($projectionName)); + Assert::assertNull($result); } protected function deserializeProperties(array $properties): PropertyValuesToWrite diff --git a/Neos.ContentRepositoryRegistry.TestSuite/Classes/Behavior/CRRegistrySubjectProvider.php b/Neos.ContentRepositoryRegistry.TestSuite/Classes/Behavior/CRRegistrySubjectProvider.php index 51216bbbd7a..e4eeaedd33b 100644 --- a/Neos.ContentRepositoryRegistry.TestSuite/Classes/Behavior/CRRegistrySubjectProvider.php +++ b/Neos.ContentRepositoryRegistry.TestSuite/Classes/Behavior/CRRegistrySubjectProvider.php @@ -13,15 +13,14 @@ * source code. */ -use Doctrine\DBAL\Connection; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceFactoryInterface; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException; -use Neos\EventStore\EventStoreInterface; +use PHPUnit\Framework\Assert; /** * The CR registry subject provider trait for behavioral tests @@ -53,24 +52,18 @@ protected function setUpCRRegistry(): void /** * @Given /^I initialize content repository "([^"]*)"$/ */ - public function iInitializeContentRepository(string $contentRepositoryId): void + public function iInitializeContentRepository(string $rawContentRepositoryId): void { - $contentRepository = $this->getContentRepository(ContentRepositoryId::fromString($contentRepositoryId)); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepository->id, new SubscriptionServiceFactory()); - /** @var EventStoreInterface $eventStore */ - $eventStore = (new \ReflectionClass($contentRepository))->getProperty('eventStore')->getValue($contentRepository); - /** @var Connection $databaseConnection */ - $databaseConnection = (new \ReflectionClass($eventStore))->getProperty('connection')->getValue($eventStore); - $eventTableName = sprintf('cr_%s_events', $contentRepositoryId); - $databaseConnection->executeStatement('TRUNCATE ' . $eventTableName); + $contentRepositoryId = ContentRepositoryId::fromString($rawContentRepositoryId); - if (!in_array($contentRepository->id, self::$alreadySetUpContentRepositories)) { - $subscriptionService->setupEventStore(); - $subscriptionService->subscriptionEngine->setup(); - self::$alreadySetUpContentRepositories[] = $contentRepository->id; + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + if (!in_array($contentRepositoryId, self::$alreadySetUpContentRepositories)) { + $result = $contentRepositoryMaintainer->setUp(); + Assert::assertNull($result); + self::$alreadySetUpContentRepositories[] = $contentRepositoryId; } - $subscriptionService->subscriptionEngine->reset(); - $subscriptionService->subscriptionEngine->boot(); + $result = $contentRepositoryMaintainer->prune(); + Assert::assertNull($result); } /** diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/CrCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/CrCommandController.php index 0e8ee668a87..c51957ed96c 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Command/CrCommandController.php +++ b/Neos.ContentRepositoryRegistry/Classes/Command/CrCommandController.php @@ -4,26 +4,38 @@ namespace Neos\ContentRepositoryRegistry\Command; use Neos\ContentRepository\Core\Projection\ProjectionStatusType; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; +use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; -use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventStore\StatusType; +use Neos\Flow\Annotations as Flow; use Neos\Flow\Cli\CommandController; -use stdClass; -use Symfony\Component\Console\Helper\ProgressBar; -use Symfony\Component\Console\Output\ConsoleOutput; use Symfony\Component\Console\Output\Output; +/** + * Set up a content repository + * + * *Initialisation* + * + * The command "./flow cr:setup" sets up the content repository like event store and subscription database tables. + * It is non-destructive. + * + * Note that a reset is not implemented here but for the Neos CMS use-case provided via "./flow site:pruneAll" + * + * *Staus information* + * + * The status of the content repository e.g. if a setup is required or if all subscriptions are active and their position + * can be examined with "./flow cr:status" + * + * See also {@see ContentRepositoryMaintainer} for more information. + */ final class CrCommandController extends CommandController { - - public function __construct( - private readonly ContentRepositoryRegistry $contentRepositoryRegistry, - ) { - parent::__construct(); - } + #[Flow\Inject()] + protected ContentRepositoryRegistry $contentRepositoryRegistry; /** * Sets up and checks required dependencies for a Content Repository instance @@ -35,78 +47,23 @@ public function __construct( * To check if the content repository needs to be setup look into cr:status. * That command will also display information what is about to be migrated. * + * @param bool $quiet If set, no output is generated. This is useful if only the exit code (0 = all OK, 1 = errors or warnings) is of interest * @param string $contentRepository Identifier of the Content Repository to set up */ - public function setupCommand(string $contentRepository = 'default'): void - { - $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - $subscriptionService->setupEventStore(); - $setupResult = $subscriptionService->subscriptionEngine->setup(); - if ($setupResult->errors === null) { - $this->outputLine('Content Repository "%s" was set up', [$contentRepositoryId->value]); - return; - } - $this->outputLine('Setup of Content Repository "%s" produced the following error%s', [$contentRepositoryId->value, $setupResult->errors->count() === 1 ? '' : 's']); - foreach ($setupResult->errors as $error) { - $this->outputLine('Subscription "%s": %s', [$error->subscriptionId->value, $error->message]); - } - $this->quit(1); - } - - public function subscriptionsBootCommand(string $contentRepository = 'default', bool $quiet = false): void + public function setupCommand(string $contentRepository = 'default', bool $quiet = false): void { - $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - if (!$quiet) { - $this->outputLine('Booting new subscriptions'); - // render memory consumption and time remaining - $this->output->getProgressBar()->setFormat('debug'); - $this->output->progressStart(); - $bootResult = $subscriptionService->subscriptionEngine->boot(progressCallback: fn () => $this->output->progressAdvance()); - $this->output->progressFinish(); - $this->outputLine(); - if ($bootResult->hasFailed() === false) { - $this->outputLine('Done'); - return; - } - } else { - $bootResult = $subscriptionService->subscriptionEngine->boot(); - } - if ($bootResult->hasFailed()) { - $this->outputLine('Booting of Content Repository "%s" produced the following error%s', [$contentRepositoryId->value, $bootResult->errors->count() === 1 ? '' : 's']); - foreach ($bootResult->errors as $error) { - $this->outputLine('Subscription "%s": %s', [$error->subscriptionId->value, $error->message]); - } - $this->quit(1); + if ($quiet) { + $this->output->getOutput()->setVerbosity(Output::VERBOSITY_QUIET); } - } - - public function subscriptionsCatchUpCommand(string $contentRepository = 'default', bool $quiet = false): void - { $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - $subscriptionService->subscriptionEngine->catchUpActive(); - } + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); - public function subscriptionsResetCommand(string $contentRepository = 'default', bool $force = false): void - { - if (!$force && !$this->output->askConfirmation('Are you sure? (y/n) ', false)) { - $this->outputLine('Cancelled'); - $this->quit(); - } - $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - $resetResult = $subscriptionService->subscriptionEngine->reset(); - if ($resetResult->errors === null) { - $this->outputLine('Content Repository "%s" was reset', [$contentRepositoryId->value]); - return; - } - $this->outputLine('Reset of Content Repository "%s" produced the following error%s', [$contentRepositoryId->value, $resetResult->errors->count() === 1 ? '' : 's']); - foreach ($resetResult->errors as $error) { - $this->outputLine('Subscription "%s": %s', [$error->subscriptionId->value, $error->message]); + $result = $contentRepositoryMaintainer->setUp(); + if ($result !== null) { + $this->outputLine('%s', [$result->getMessage()]); + $this->quit(1); } - $this->quit(1); + $this->outputLine('Content Repository "%s" was set up', [$contentRepositoryId->value]); } /** @@ -124,67 +81,82 @@ public function statusCommand(string $contentRepository = 'default', bool $verbo $this->output->getOutput()->setVerbosity(Output::VERBOSITY_QUIET); } $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); - $subscriptionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory()); - $eventStoreStatus = $subscriptionService->eventStoreStatus(); + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + $crStatus = $contentRepositoryMaintainer->status(); $hasErrors = false; + $reactivationRequired = false; $setupRequired = false; $bootingRequired = false; - $resetRequired = false; - - $this->output('Event Store: '); - $this->outputLine(match ($eventStoreStatus->type) { + $this->outputLine('Event Store:'); + $this->output(' Setup: '); + $this->outputLine(match ($crStatus->eventStoreStatus->type) { StatusType::OK => 'OK', StatusType::SETUP_REQUIRED => 'Setup required!', StatusType::ERROR => 'ERROR', }); - $hasErrors |= $eventStoreStatus->type === StatusType::ERROR; - if ($verbose && $eventStoreStatus->details !== '') { - $this->outputFormatted($eventStoreStatus->details, [], 2); + if ($crStatus->eventStorePosition) { + $this->outputLine(' Position: %d', [$crStatus->eventStorePosition->value]); + } else { + $this->outputLine(' Position: Loading failed!'); + } + $hasErrors |= $crStatus->eventStoreStatus->type === StatusType::ERROR; + if ($verbose && $crStatus->eventStoreStatus->details !== '') { + $this->outputFormatted($crStatus->eventStoreStatus->details, [], 2); } $this->outputLine(); $this->outputLine('Subscriptions:'); - $subscriptionStatuses = $subscriptionService->subscriptionEngine->subscriptionStatuses(); - if ($subscriptionStatuses->isEmpty()) { + if ($crStatus->subscriptionStatus->isEmpty()) { $this->outputLine('There are no registered subscriptions yet, please run ./flow cr:setup'); $this->quit(1); } - foreach ($subscriptionStatuses as $status) { - $this->outputLine(' %s:', [$status->subscriptionId->value]); - $this->output(' Subscription: ', [$status->subscriptionId->value]); - $this->output(match ($status->subscriptionStatus) { - SubscriptionStatus::NEW => 'NEW', - SubscriptionStatus::BOOTING => 'BOOTING', - SubscriptionStatus::ACTIVE => 'ACTIVE', - SubscriptionStatus::DETACHED => 'DETACHED', - SubscriptionStatus::ERROR => 'ERROR', - }); - $this->outputLine(' at position %d', [$status->subscriptionPosition->value]); - $hasErrors |= $status->subscriptionStatus === SubscriptionStatus::ERROR; - $bootingRequired |= $status->subscriptionStatus === SubscriptionStatus::BOOTING; - if ($verbose && $status->subscriptionError !== null) { - $lines = explode(chr(10), $status->subscriptionError->errorMessage ?: 'No details available.'); - foreach ($lines as $line) { - $this->outputLine(' %s', [$line]); - } + foreach ($crStatus->subscriptionStatus as $status) { + if ($status instanceof DetachedSubscriptionStatus) { + $this->outputLine(' %s:', [$status->subscriptionId->value]); + $this->output(' Subscription: '); + $this->output('%s DETACHED', [$status->subscriptionId->value, $status->subscriptionStatus === SubscriptionStatus::DETACHED ? 'is' : 'will be']); + $this->outputLine(' at position %d', [$status->subscriptionPosition->value]); } - if ($status->projectionStatus !== null) { - $this->output(' Projection: '); - $this->outputLine(match ($status->projectionStatus->type) { + if ($status instanceof ProjectionSubscriptionStatus) { + $this->outputLine(' %s:', [$status->subscriptionId->value]); + $this->output(' Setup: '); + $this->outputLine(match ($status->setupStatus->type) { ProjectionStatusType::OK => 'OK', - ProjectionStatusType::SETUP_REQUIRED => 'Setup required!', - ProjectionStatusType::REPLAY_REQUIRED => 'Replay required!', + ProjectionStatusType::SETUP_REQUIRED => 'SETUP REQUIRED', ProjectionStatusType::ERROR => 'ERROR', }); - $hasErrors |= $status->projectionStatus->type === ProjectionStatusType::ERROR; - $setupRequired |= $status->projectionStatus->type === ProjectionStatusType::SETUP_REQUIRED; - $resetRequired |= $status->projectionStatus->type === ProjectionStatusType::REPLAY_REQUIRED; - if ($verbose && ($status->projectionStatus->type !== ProjectionStatusType::OK || $status->projectionStatus->details)) { - $lines = explode(chr(10), $status->projectionStatus->details ?: 'No details available.'); + $hasErrors |= $status->setupStatus->type === ProjectionStatusType::ERROR; + $setupRequired |= $status->setupStatus->type === ProjectionStatusType::SETUP_REQUIRED; + if ($verbose && ($status->setupStatus->type !== ProjectionStatusType::OK || $status->setupStatus->details)) { + $lines = explode(chr(10), $status->setupStatus->details ?: 'No details available.'); foreach ($lines as $line) { $this->outputLine(' ' . $line); } $this->outputLine(); } + $this->output(' Projection: '); + $this->output(match ($status->subscriptionStatus) { + SubscriptionStatus::NEW => 'NEW', + SubscriptionStatus::BOOTING => 'BOOTING', + SubscriptionStatus::ACTIVE => 'ACTIVE', + SubscriptionStatus::DETACHED => 'DETACHED', + SubscriptionStatus::ERROR => 'ERROR', + }); + if ($crStatus->eventStorePosition?->value > $status->subscriptionPosition->value) { + // projection is behind + $this->outputLine(' at position %d', [$status->subscriptionPosition->value]); + } else { + $this->outputLine(' at position %d', [$status->subscriptionPosition->value]); + } + $hasErrors |= $status->subscriptionStatus === SubscriptionStatus::ERROR; + $reactivationRequired |= $status->subscriptionStatus === SubscriptionStatus::ERROR; + $bootingRequired |= $status->subscriptionStatus === SubscriptionStatus::BOOTING; + $reactivationRequired |= $status->subscriptionStatus === SubscriptionStatus::DETACHED; + if ($verbose && $status->subscriptionError !== null) { + $lines = explode(chr(10), $status->subscriptionError->errorMessage ?: 'No details available.'); + foreach ($lines as $line) { + $this->outputLine(' %s', [$line]); + } + } } } if ($verbose) { @@ -193,17 +165,67 @@ public function statusCommand(string $contentRepository = 'default', bool $verbo $this->outputLine('Setup required, please run ./flow cr:setup'); } if ($bootingRequired) { - $this->outputLine('Some subscriptions need to be booted, please run ./flow cr:subscriptionsboot'); - } - if ($resetRequired) { - $this->outputLine('Some subscriptions need to be replayed, please run ./flow cr:subscriptionsreset'); + $this->outputLine('Replay needed for BOOTING projections, please run ./flow subscription:replay [subscription-id]'); } - if ($hasErrors) { - $this->outputLine('Some subscriptions/projections have failed'); + if ($reactivationRequired) { + $this->outputLine('Reactivation of ERROR or DETACHED projection required, please run ./flow subscription:reactivate [subscription-id]'); } } if ($hasErrors) { $this->quit(1); } } + + /** + * Replays the specified projection of a Content Repository by resetting its state and performing a full catchup. + * + * @param string $projection Identifier of the projection to replay + * @param string $contentRepository Identifier of the Content Repository instance to operate on + * @param bool $force Replay the projection without confirmation. This may take some time! + * @param bool $quiet If set only fatal errors are rendered to the output (must be used with --force flag to avoid user input) + * @internal + * @deprecated with Neos 9 Beta 17, please use ./flow subscription:replay instead + */ + public function projectionReplayCommand(string $projection, string $contentRepository = 'default', bool $force = false, bool $quiet = false): void + { + $subscriptionId = match($projection) { + 'doctrineDbalContentGraph', + 'Neos\ContentGraph\DoctrineDbalAdapter\DoctrineDbalContentGraphProjection' => 'contentGraph', + 'documentUriPathProjection' => 'Neos.Neos:DocumentUriPathProjection', + 'change' => 'Neos.Neos:PendingChangesProjection', + default => null + }; + if ($subscriptionId === null) { + $this->outputLine('Invalid --projection specified. Please use ./flow subscription:replay [contentGraph|Neos.Neos:DocumentUriPathProjection|...] directly.'); + $this->quit(1); + } + $this->outputLine('Please use ./flow subscription:replay %s instead!', [$subscriptionId]); + $this->forward( + 'replay', + SubscriptionCommandController::class, + array_merge( + ['subscription' => $subscriptionId], + compact('contentRepository', 'force', 'quiet') + ) + ); + } + + /** + * Replays all projections of the specified Content Repository by resetting their states and performing a full catchup + * + * @param string $contentRepository Identifier of the Content Repository instance to operate on + * @param bool $force Replay the projection without confirmation. This may take some time! + * @param bool $quiet If set only fatal errors are rendered to the output (must be used with --force flag to avoid user input) + * @internal + * @deprecated with Neos 9 Beta 17, please use ./flow subscription:replayall instead + */ + public function projectionReplayAllCommand(string $contentRepository = 'default', bool $force = false, bool $quiet = false): void + { + $this->outputLine('Please use ./flow subscription:replayall instead!'); + $this->forward( + 'replayall', + SubscriptionCommandController::class, + compact('contentRepository', 'force', 'quiet') + ); + } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubscriptionCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubscriptionCommandController.php new file mode 100644 index 00000000000..4da391f52c7 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Classes/Command/SubscriptionCommandController.php @@ -0,0 +1,180 @@ +output->getOutput()->setVerbosity(Output::VERBOSITY_QUIET); + } + if (!$force && $quiet) { + $this->outputLine('Cannot run in quiet mode without --force. Please acknowledge that this command will reset and replay this subscription. This may take some time.'); + $this->quit(1); + } + + if (!$force && !$this->output->askConfirmation(sprintf('> This will replay the subscription "%s" in "%s", which may take some time. Are you sure to proceed? (y/n) ', $subscription, $contentRepository), false)) { + $this->outputLine('Abort.'); + return; + } + + $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + + $progressCallback = null; + if (!$quiet) { + $this->outputLine('Replaying events for subscription "%s" of Content Repository "%s" ...', [$subscription, $contentRepositoryId->value]); + // render memory consumption and time remaining + $this->output->getProgressBar()->setFormat('debug'); + $this->output->progressStart(); + $progressCallback = fn () => $this->output->progressAdvance(); + } + + $result = $contentRepositoryMaintainer->replaySubscription(SubscriptionId::fromString($subscription), progressCallback: $progressCallback); + + if (!$quiet) { + $this->output->progressFinish(); + $this->outputLine(); + } + + if ($result !== null) { + $this->outputLine('%s', [$result->getMessage()]); + $this->quit(1); + } elseif (!$quiet) { + $this->outputLine('Done.'); + } + } + + /** + * Replays all projections of the specified Content Repository by resetting their states and performing a full catchup + * + * @param string $contentRepository Identifier of the Content Repository instance to operate on + * @param bool $force Replay all subscriptions without confirmation. This may take some time! + * @param bool $quiet If set only fatal errors are rendered to the output (must be used with --force flag to avoid user input) + */ + public function replayAllCommand(string $contentRepository = 'default', bool $force = false, bool $quiet = false): void + { + if ($quiet) { + $this->output->getOutput()->setVerbosity(Output::VERBOSITY_QUIET); + } + + if (!$force && $quiet) { + $this->outputLine('Cannot run in quiet mode without --force. Please acknowledge that this command will reset and replay all subscriptions. This may take some time.'); + $this->quit(1); + } + + if (!$force && !$this->output->askConfirmation(sprintf('> This will replay all projections in "%s", which may take some time. Are you sure to proceed? (y/n) ', $contentRepository), false)) { + $this->outputLine('Abort.'); + return; + } + + $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + + $progressCallback = null; + if (!$quiet) { + $this->outputLine('Replaying events for all projections of Content Repository "%s" ...', [$contentRepositoryId->value]); + // render memory consumption and time remaining + // todo maybe reintroduce pretty output: https://github.com/neos/neos-development-collection/pull/5010 but without using highestSequenceNumber + $this->output->getProgressBar()->setFormat('debug'); + $this->output->progressStart(); + $progressCallback = fn () => $this->output->progressAdvance(); + } + + $result = $contentRepositoryMaintainer->replayAllSubscriptions(progressCallback: $progressCallback); + + if (!$quiet) { + $this->output->progressFinish(); + $this->outputLine(); + } + + if ($result !== null) { + $this->outputLine('%s', [$result->getMessage()]); + $this->quit(1); + } elseif (!$quiet) { + $this->outputLine('Done.'); + } + } + + /** + * Reactivate a subscription + * + * The explicit catchup is only needed for projections in the error or detached status with an advanced position. + * Running a full replay would work but might be overkill, instead this reactivation will just attempt + * catchup the subscription back to active from its current position. + * + * @param string $subscription Identifier of the subscription to reactivate like it was configured (e.g. "contentGraph", "Vendor.Package:YourProjection") + * @param string $contentRepository Identifier of the Content Repository instance to operate on + * @param bool $quiet If set only fatal errors are rendered to the output (must be used with --force flag to avoid user input) + */ + public function reactivateCommand(string $subscription, string $contentRepository = 'default', bool $quiet = false): void + { + $contentRepositoryId = ContentRepositoryId::fromString($contentRepository); + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + + $progressCallback = null; + if (!$quiet) { + $this->outputLine('Reactivate subscription "%s" of Content Repository "%s" ...', [$subscription, $contentRepositoryId->value]); + // render memory consumption and time remaining + $this->output->getProgressBar()->setFormat('debug'); + $this->output->progressStart(); + $progressCallback = fn () => $this->output->progressAdvance(); + } + + $result = $contentRepositoryMaintainer->reactivateSubscription(SubscriptionId::fromString($subscription), progressCallback: $progressCallback); + + if (!$quiet) { + $this->output->progressFinish(); + $this->outputLine(); + } + + if ($result !== null) { + $this->outputLine('%s', [$result->getMessage()]); + $this->quit(1); + } elseif (!$quiet) { + $this->outputLine('Done.'); + } + } +} diff --git a/Neos.ContentRepositoryRegistry/Classes/Processors/ProjectionCatchupProcessor.php b/Neos.ContentRepositoryRegistry/Classes/Processors/ProjectionCatchupProcessor.php deleted file mode 100644 index 71bada85b93..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Processors/ProjectionCatchupProcessor.php +++ /dev/null @@ -1,24 +0,0 @@ -subscriptionService->subscriptionEngine->catchUpActive(); - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Processors/ProjectionResetProcessor.php b/Neos.ContentRepositoryRegistry/Classes/Processors/SubscriptionReplayProcessor.php similarity index 53% rename from Neos.ContentRepositoryRegistry/Classes/Processors/ProjectionResetProcessor.php rename to Neos.ContentRepositoryRegistry/Classes/Processors/SubscriptionReplayProcessor.php index 1dfdd7d4208..83ce99eaca0 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Processors/ProjectionResetProcessor.php +++ b/Neos.ContentRepositoryRegistry/Classes/Processors/SubscriptionReplayProcessor.php @@ -3,22 +3,22 @@ namespace Neos\ContentRepositoryRegistry\Processors; -use Neos\ContentRepository\Core\Service\SubscriptionService; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer; use Neos\ContentRepository\Export\ProcessingContext; use Neos\ContentRepository\Export\ProcessorInterface; /** * @internal */ -final readonly class ProjectionResetProcessor implements ProcessorInterface +final readonly class SubscriptionReplayProcessor implements ProcessorInterface { public function __construct( - private SubscriptionService $subscriptionService, + private ContentRepositoryMaintainer $contentRepositoryMaintainer, ) { } public function run(ProcessingContext $context): void { - $this->subscriptionService->subscriptionEngine->reset(); + $this->contentRepositoryMaintainer->replayAllSubscriptions(); } } diff --git a/Neos.ContentRepositoryRegistry/Tests/Functional/ContentRepositoryMaintenanceCommandControllerTest.php b/Neos.ContentRepositoryRegistry/Tests/Functional/ContentRepositoryMaintenanceCommandControllerTest.php new file mode 100644 index 00000000000..cf8de0c0b88 --- /dev/null +++ b/Neos.ContentRepositoryRegistry/Tests/Functional/ContentRepositoryMaintenanceCommandControllerTest.php @@ -0,0 +1,142 @@ +crController = $this->getObject(CrCommandController::class); + $this->subscriptionController = $this->getObject(SubscriptionCommandController::class); + + $this->response = new Response(); + $this->bufferedOutput = new BufferedOutput(); + + ObjectAccess::setProperty($this->crController, 'response', $this->response, true); + ObjectAccess::getProperty($this->crController, 'output', true)->setOutput($this->bufferedOutput); + + ObjectAccess::setProperty($this->subscriptionController, 'response', $this->response, true); + ObjectAccess::getProperty($this->subscriptionController, 'output', true)->setOutput($this->bufferedOutput); + } + + /** @test */ + public function setupOnEmptyEventStore(): void + { + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); + + $this->crController->setupCommand(contentRepository: $this->contentRepository->id->value, quiet: true); + self::assertEmpty($this->bufferedOutput->fetch()); + + // projections are marked active because the event store is empty + $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::none()); + $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); + + $this->crController->statusCommand(contentRepository: $this->contentRepository->id->value, quiet: true); + self::assertEmpty($this->bufferedOutput->fetch()); + } + + /** @test */ + public function setupOnModifiedEventStore(): void + { + $this->eventStore->setup(); + $this->commitExampleContentStreamEvent(); + + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::once())->method('apply'); + $this->fakeProjection->expects(self::once())->method('resetState'); + $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); + + $this->crController->setupCommand(contentRepository: $this->contentRepository->id->value, quiet: true); + self::assertEmpty($this->bufferedOutput->fetch()); + + $this->expectOkayStatus('contentGraph', SubscriptionStatus::BOOTING, SequenceNumber::none()); + $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); + + $this->crController->statusCommand(contentRepository: $this->contentRepository->id->value, quiet: true); + self::assertEmpty($this->bufferedOutput->fetch()); + + $this->subscriptionController->replayCommand(subscription: 'contentGraph', contentRepository: $this->contentRepository->id->value, force: true, quiet: true); + self::assertEmpty($this->bufferedOutput->fetch()); + + $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); + + $this->subscriptionController->replayAllCommand(contentRepository: $this->contentRepository->id->value, force: true, quiet: true); + + $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + } + + /** @test */ + public function projectionInError(): void + { + $this->eventStore->setup(); + $this->fakeProjection->expects(self::any())->method('setUp'); + $this->fakeProjection->expects(self::any())->method('apply'); + $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); + + $this->crController->setupCommand(contentRepository: $this->contentRepository->id->value, quiet: true); + self::assertEmpty($this->bufferedOutput->fetch()); + + $this->secondFakeProjection->injectSaboteur(fn () => throw new \RuntimeException('This projection is kaputt.')); + + try { + $this->contentRepository->handle(CreateRootWorkspace::create( + WorkspaceName::forLive(), + ContentStreamId::create() + )); + } catch (\RuntimeException) { + } + + self::assertEquals( + SubscriptionStatus::ERROR, + $this->subscriptionStatus('Vendor.Package:SecondFakeProjection')?->subscriptionStatus + ); + + try { + $this->crController->statusCommand(contentRepository: $this->contentRepository->id->value, quiet: true); + } catch (StopCommandException) { + } + // exit error code because one projection has a failure + self::assertEquals(1, $this->response->getExitCode()); + self::assertEmpty($this->bufferedOutput->fetch()); + + // repair projection + $this->secondFakeProjection->killSaboteur(); + $this->subscriptionController->reactivateCommand(subscription: 'Vendor.Package:SecondFakeProjection', contentRepository: $this->contentRepository->id->value, quiet: true); + + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); + } +} diff --git a/Neos.Neos/Classes/Domain/Pruning/ContentRepositoryPruningProcessor.php b/Neos.Neos/Classes/Domain/Pruning/ContentRepositoryPruningProcessor.php index 0b94195c9d1..9cb7cf60b24 100644 --- a/Neos.Neos/Classes/Domain/Pruning/ContentRepositoryPruningProcessor.php +++ b/Neos.Neos/Classes/Domain/Pruning/ContentRepositoryPruningProcessor.php @@ -14,22 +14,25 @@ namespace Neos\Neos\Domain\Pruning; -use Neos\ContentRepository\Core\Service\ContentStreamPruner; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer; use Neos\ContentRepository\Export\ProcessingContext; use Neos\ContentRepository\Export\ProcessorInterface; /** - * Pruning processor that removes all events from the given cr + * Pruning processor that removes all events from the given cr and resets the projections */ final readonly class ContentRepositoryPruningProcessor implements ProcessorInterface { public function __construct( - private ContentStreamPruner $contentStreamPruner, + private ContentRepositoryMaintainer $contentRepositoryMaintainer, ) { } public function run(ProcessingContext $context): void { - $this->contentStreamPruner->pruneAllWorkspacesAndContentStreamsFromEventStream(); + $result = $this->contentRepositoryMaintainer->prune(); + if ($result !== null) { + throw new \RuntimeException($result->getMessage(), 1732461335); + } } } diff --git a/Neos.Neos/Classes/Domain/Service/SiteImportService.php b/Neos.Neos/Classes/Domain/Service/SiteImportService.php index 494b4779c8f..ebe76134d19 100644 --- a/Neos.Neos/Classes/Domain/Service/SiteImportService.php +++ b/Neos.Neos/Classes/Domain/Service/SiteImportService.php @@ -17,10 +17,12 @@ use Doctrine\DBAL\Exception as DBALException; use League\Flysystem\Filesystem; use League\Flysystem\Local\LocalFilesystemAdapter; -use Neos\ContentRepository\Core\ContentRepository; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; +use Neos\ContentRepository\Core\Projection\ProjectionStatusType; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; +use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Export\Factory\EventStoreImportProcessorFactory; use Neos\ContentRepository\Export\ProcessingContext; use Neos\ContentRepository\Export\ProcessorInterface; @@ -28,7 +30,8 @@ use Neos\ContentRepository\Export\Processors\AssetRepositoryImportProcessor; use Neos\ContentRepository\Export\Severity; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; -use Neos\ContentRepositoryRegistry\Processors\ProjectionCatchupProcessor; +use Neos\ContentRepositoryRegistry\Processors\SubscriptionReplayProcessor; +use Neos\EventStore\Model\EventStore\StatusType; use Neos\Flow\Annotations as Flow; use Neos\Flow\Persistence\Doctrine\Service as DoctrineService; use Neos\Flow\Persistence\PersistenceManagerInterface; @@ -67,8 +70,10 @@ public function importFromPath(ContentRepositoryId $contentRepositoryId, string } $contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId); + $contentRepositoryMaintainer = $this->contentRepositoryRegistry->buildService($contentRepositoryId, new ContentRepositoryMaintainerFactory()); + $this->requireDataBaseSchemaToBeSetup(); - $this->requireContentRepositoryToBeSetup($contentRepository); + $this->requireContentRepositoryToBeSetup($contentRepositoryMaintainer, $contentRepositoryId); $filesystem = new Filesystem(new LocalFilesystemAdapter($path)); $context = new ProcessingContext($filesystem, $onMessage); @@ -78,7 +83,9 @@ public function importFromPath(ContentRepositoryId $contentRepositoryId, string 'Create Neos sites' => new SiteCreationProcessor($this->siteRepository, $this->domainRepository, $this->persistenceManager), 'Import events' => $this->contentRepositoryRegistry->buildService($contentRepositoryId, new EventStoreImportProcessorFactory(WorkspaceName::forLive(), keepEventIds: true)), 'Import assets' => new AssetRepositoryImportProcessor($this->assetRepository, $this->resourceRepository, $this->resourceManager, $this->persistenceManager), - 'Catchup all projections' => new ProjectionCatchupProcessor($this->contentRepositoryRegistry->buildService($contentRepositoryId, new SubscriptionServiceFactory())), + // WARNING! We do a replay here even though it will redo the live workspace creation. But otherwise the catchup hooks cannot determine that they need to be skipped as it seems like a regular catchup + // In case we allow to import events into other root workspaces, or don't expect live to be empty (see Import events), this would need to be adjusted, as otherwise existing data will be replayed + 'Replay all subscriptions' => new SubscriptionReplayProcessor($contentRepositoryMaintainer), ]); foreach ($processors as $processorLabel => $processor) { @@ -87,13 +94,19 @@ public function importFromPath(ContentRepositoryId $contentRepositoryId, string } } - private function requireContentRepositoryToBeSetup(ContentRepository $contentRepository): void + private function requireContentRepositoryToBeSetup(ContentRepositoryMaintainer $contentRepositoryMaintainer, ContentRepositoryId $contentRepositoryId): void { -// TODO reimplement -// $status = $contentRepository->status(); -// if (!$status->isOk()) { -// throw new \RuntimeException(sprintf('Content repository %s is not setup correctly, please run `./flow cr:setup`', $contentRepository->id->value)); -// } + $status = $contentRepositoryMaintainer->status(); + if ($status->eventStoreStatus->type !== StatusType::OK) { + throw new \RuntimeException(sprintf('Content repository %s is not setup correctly, please run `./flow cr:setup`', $contentRepositoryId->value)); + } + foreach ($status->subscriptionStatus as $status) { + if ($status instanceof ProjectionSubscriptionStatus) { + if ($status->setupStatus->type !== ProjectionStatusType::OK) { + throw new \RuntimeException(sprintf('Projection %s in content repository %s is not setup correctly, please run `./flow cr:setup`', $status->subscriptionId->value, $contentRepositoryId->value)); + } + } + } } private function requireDataBaseSchemaToBeSetup(): void diff --git a/Neos.Neos/Classes/Domain/Service/SitePruningService.php b/Neos.Neos/Classes/Domain/Service/SitePruningService.php index 968ec5d9427..18ab6f1d78f 100644 --- a/Neos.Neos/Classes/Domain/Service/SitePruningService.php +++ b/Neos.Neos/Classes/Domain/Service/SitePruningService.php @@ -16,8 +16,7 @@ use League\Flysystem\Filesystem; use League\Flysystem\Local\LocalFilesystemAdapter; -use Neos\ContentRepository\Core\Service\ContentStreamPrunerFactory; -use Neos\ContentRepository\Core\Service\SubscriptionServiceFactory; +use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainerFactory; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Export\ProcessingContext; @@ -25,7 +24,6 @@ use Neos\ContentRepository\Export\Processors; use Neos\ContentRepository\Export\Severity; use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry; -use Neos\ContentRepositoryRegistry\Processors\ProjectionResetProcessor; use Neos\Flow\Annotations as Flow; use Neos\Flow\Persistence\PersistenceManagerInterface; use Neos\Neos\Domain\Pruning\ContentRepositoryPruningProcessor; @@ -66,17 +64,11 @@ public function pruneAll(ContentRepositoryId $contentRepositoryId, \Closure $onP $this->domainRepository, $this->persistenceManager ), - 'Prune content repository' => new ContentRepositoryPruningProcessor( - $this->contentRepositoryRegistry->buildService( - $contentRepositoryId, - new ContentStreamPrunerFactory() - ) - ), 'Prune roles and metadata' => new RoleAndMetadataPruningProcessor($contentRepositoryId, $this->workspaceMetadataAndRoleRepository), - 'Reset all projections' => new ProjectionResetProcessor( + 'Prune content repository' => new ContentRepositoryPruningProcessor( $this->contentRepositoryRegistry->buildService( $contentRepositoryId, - new SubscriptionServiceFactory() + new ContentRepositoryMaintainerFactory() ) ) ]); diff --git a/Neos.Neos/Tests/Behavior/Features/FrontendRouting/Basic.feature b/Neos.Neos/Tests/Behavior/Features/FrontendRouting/Basic.feature index 640ac7e62bf..c9fde1e6e6b 100644 --- a/Neos.Neos/Tests/Behavior/Features/FrontendRouting/Basic.feature +++ b/Neos.Neos/Tests/Behavior/Features/FrontendRouting/Basic.feature @@ -113,7 +113,7 @@ Feature: Basic routing functionality (match & resolve document nodes in one dime # !!! when caches were still enabled (without calling DocumentUriPathFinder->disableCache()), the replay below will # show really "interesting" (non-correct) results. This was bug #4253. - When I replay the "Neos\Neos\FrontendRouting\Projection\DocumentUriPathProjection" projection + When I replay the "Neos.Neos:DocumentUriPathProjection" projection Then the node "sir-david-nodenborough" in content stream "cs-identifier" and dimension "{}" should resolve to URL "/david-nodenborough-updated-b" And the node "earl-o-documentbourgh" in content stream "cs-identifier" and dimension "{}" should resolve to URL "/david-nodenborough-updated-b/earl-document" diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index aab9e3e26d4..2477370866c 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,5 +1,20 @@ parameters: ignoreErrors: + - + message: "#^The internal method \"Neos\\\\ContentRepository\\\\Core\\\\Subscription\\\\SubscriptionIds\\:\\:toStringArray\" is called\\.$#" + count: 1 + path: Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php + + - + message: "#^The internal method \"Neos\\\\ContentRepository\\\\Core\\\\Subscription\\\\SubscriptionStatusFilter\\:\\:isEmpty\" is called\\.$#" + count: 1 + path: Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php + + - + message: "#^The internal method \"Neos\\\\ContentRepository\\\\Core\\\\Subscription\\\\SubscriptionStatusFilter\\:\\:toStringArray\" is called\\.$#" + count: 1 + path: Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php + - message: "#^Method Neos\\\\Neos\\\\Controller\\\\Backend\\\\MenuHelper\\:\\:buildModuleList\\(\\) return type has no value type specified in iterable type array\\.$#" count: 1