diff --git a/Classes/Infrastructure/ProjectionTransactionTrait.php b/Classes/Infrastructure/ProjectionTransactionTrait.php new file mode 100644 index 0000000..2a6b9d6 --- /dev/null +++ b/Classes/Infrastructure/ProjectionTransactionTrait.php @@ -0,0 +1,37 @@ +dbal->isTransactionActive() === false) { + /** @phpstan-ignore argument.templateType */ + $this->dbal->transactional($closure); + return; + } + // technically we could leverage nested transactions from dbal, which effectively does the same. + // but that requires us to enable this globally first via setNestTransactionsWithSavepoints also making this explicit is more transparent: + $this->dbal->createSavepoint('PROJECTION'); + try { + $closure(); + } catch (\Throwable $e) { + // roll back the partially applied event on the projection + $this->dbal->rollbackSavepoint('PROJECTION'); + throw $e; + } + $this->dbal->releaseSavepoint('PROJECTION'); + } +} diff --git a/Classes/Projection/ProjectionInterface.php b/Classes/Projection/ProjectionInterface.php index aff5738..19090c1 100644 --- a/Classes/Projection/ProjectionInterface.php +++ b/Classes/Projection/ProjectionInterface.php @@ -30,6 +30,16 @@ public function setUp(): void; */ public function status(): ProjectionStatus; + /** + * Must invoke the closure which will update the catchup hooks and {@see apply}. + * Additionally, to guarantee exactly once delivery and also to behave correct during exceptions (even fatal ones), + * a database transaction should be started, or if a transaction is already active on the same connection save points + * must be used and rolled back on error. + * + * @param-immediately-invoked-callable $closure + */ + public function transactional(\Closure $closure): void; + public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void; /** diff --git a/Classes/Subscription/Engine/SubscriptionEngine.php b/Classes/Subscription/Engine/SubscriptionEngine.php index 8d556ec..93dd501 100644 --- a/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Classes/Subscription/Engine/SubscriptionEngine.php @@ -349,15 +349,12 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); continue; } - $this->subscriptionStore->createSavepoint(); $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id); if ($error !== null) { // ERROR Case: - // 1.) roll back the partially applied event on the subscriber - $this->subscriptionStore->rollbackSavepoint(); - // 2.) for the leftover events we are not including this failed subscription for catchup + // 1.) for the leftover events we are not including this failed subscription for catchup $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); - // 3.) update the subscription error state on either its unchanged or new position (if some events worked) + // 2.) update the subscription error state on either its unchanged or new position (if some events worked) $this->subscriptionStore->update( $subscription->id, status: SubscriptionStatus::ERROR, @@ -367,7 +364,7 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl $error->throwable ), ); - // 4.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed + // 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed // todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon try { $this->subscribers->get($subscription->id)->onAfterCatchUp(); @@ -381,7 +378,6 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl continue; } // HAPPY Case: - $this->subscriptionStore->releaseSavepoint(); $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; } $numberOfProcessedEvents++; diff --git a/Classes/Subscription/Store/SubscriptionStoreInterface.php b/Classes/Subscription/Store/SubscriptionStoreInterface.php index 85f3107..b7b0540 100644 --- a/Classes/Subscription/Store/SubscriptionStoreInterface.php +++ b/Classes/Subscription/Store/SubscriptionStoreInterface.php @@ -35,10 +35,4 @@ public function update( * @return T */ public function transactional(\Closure $closure): mixed; - - public function createSavepoint(): void; - - public function releaseSavepoint(): void; - - public function rollbackSavepoint(): void; } diff --git a/Classes/Subscription/Subscriber/ProjectionSubscriber.php b/Classes/Subscription/Subscriber/ProjectionSubscriber.php index 7e15a8f..ee19dbc 100644 --- a/Classes/Subscription/Subscriber/ProjectionSubscriber.php +++ b/Classes/Subscription/Subscriber/ProjectionSubscriber.php @@ -34,9 +34,11 @@ public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void public function handle(EventInterface $event, EventEnvelope $eventEnvelope): void { - $this->catchUpHook?->onBeforeEvent($event, $eventEnvelope); - $this->projection->apply($event, $eventEnvelope); - $this->catchUpHook?->onAfterEvent($event, $eventEnvelope); + $this->projection->transactional(function () use ($event, $eventEnvelope) { + $this->catchUpHook?->onBeforeEvent($event, $eventEnvelope); + $this->projection->apply($event, $eventEnvelope); + $this->catchUpHook?->onAfterEvent($event, $eventEnvelope); + }); } public function onAfterCatchUp(): void