From 9f2eee95b46805c9782fee1999c450d05c906249 Mon Sep 17 00:00:00 2001 From: Ben Roberts Date: Mon, 11 Dec 2023 19:57:34 +0100 Subject: [PATCH] Refresh StoredEvent object and confirm still unpublished In extremely high concurrency situations, it was possible for the entities loaded in `PublishDomainEventSubscriber::publishEvents()` to be published by another process, before the subsequent lock was acquired. This commit enables a Doctrine "refresh" of the entity, inside the lock, which then allows us to perform a reliable lookup of the entity's published status, and then only publish if it is still unpublished. --- src/Doctrine/DoctrineEventStore.php | 28 ++++--------------- src/Domain/Model/EventStore.php | 10 ++----- src/Domain/Model/StoredEvent.php | 4 +-- .../PublishDomainEventSubscriber.php | 27 ++++++++++++------ 4 files changed, 27 insertions(+), 42 deletions(-) diff --git a/src/Doctrine/DoctrineEventStore.php b/src/Doctrine/DoctrineEventStore.php index d42f785..7f7c869 100644 --- a/src/Doctrine/DoctrineEventStore.php +++ b/src/Doctrine/DoctrineEventStore.php @@ -103,14 +103,11 @@ public function publish(StoredEvent $storedEvent): void */ public function allUnpublished(): array { - try - { + try { if (false === $this->em->getConnection()->createSchemaManager()->tablesExist([$this->tableName])) { return []; // Connection does exist, but the events table does not exist. } - } - catch (ConnectionException $connectionException) - { + } catch (ConnectionException $connectionException) { return []; // Connection itself does not exist } @@ -131,23 +128,8 @@ public function allUnpublished(): array return $qb->getQuery()->getResult(); } - /* - * @return StoredEvent[]|ArrayCollection - */ - /*public function allStoredEventsSince($eventId): ArrayCollection + public function refresh(StoredEvent $storedEvent): void { - $qb = $this->em->createQueryBuilder() - ->select('e') - ->from(StoredEvent::class, 'e') - ->orderBy('e.eventId'); - - if ($eventId) - { - $qb - ->where('se.eventId > :event_id') - ->setParameter('event_id', $eventId); - } - - return $qb->getQuery()->getResult(); - }*/ + $this->em->refresh($storedEvent); + } } diff --git a/src/Domain/Model/EventStore.php b/src/Domain/Model/EventStore.php index d20e527..a5b3d69 100644 --- a/src/Domain/Model/EventStore.php +++ b/src/Domain/Model/EventStore.php @@ -12,8 +12,6 @@ namespace Headsnet\DomainEventsBundle\Domain\Model; -use Doctrine\Common\Collections\ArrayCollection; - interface EventStore { public function nextIdentity(): EventId; @@ -22,16 +20,12 @@ public function append(DomainEvent $domainEvent): void; public function replace(DomainEvent $domainEvent): void; - public function publish(StoredEvent $domainEvent): void; + public function publish(StoredEvent $storedEvent): void; /** * @return StoredEvent[] */ public function allUnpublished(): array; - /* - * @param $eventId - * @return StoredEvent[]|ArrayCollection - */ - //public function allStoredEventsSince($eventId): ArrayCollection; + public function refresh(StoredEvent $storedEvent): void; } diff --git a/src/Domain/Model/StoredEvent.php b/src/Domain/Model/StoredEvent.php index b3f26cb..8e9c156 100644 --- a/src/Domain/Model/StoredEvent.php +++ b/src/Domain/Model/StoredEvent.php @@ -25,7 +25,7 @@ class StoredEvent private $occurredOn; /** - * @var \DateTimeImmutable + * @var \DateTimeImmutable|null */ private $publishedOn; @@ -68,7 +68,7 @@ public function getOccurredOn(): \DateTimeImmutable return $this->occurredOn; } - public function getPublishedOn(): \DateTimeImmutable + public function getPublishedOn(): ?\DateTimeImmutable { return $this->publishedOn; } diff --git a/src/EventSubscriber/PublishDomainEventSubscriber.php b/src/EventSubscriber/PublishDomainEventSubscriber.php index b59b557..04f2e8d 100644 --- a/src/EventSubscriber/PublishDomainEventSubscriber.php +++ b/src/EventSubscriber/PublishDomainEventSubscriber.php @@ -94,15 +94,24 @@ private function publishEvent(StoredEvent $storedEvent): void ); if ($lock->acquire()) { - $domainEvent = $this->serializer->deserialize( - $storedEvent->getEventBody(), - $storedEvent->getTypeName(), - 'json' - ); - assert($domainEvent instanceof DomainEvent); - - $this->domainEventDispatcher->dispatch($domainEvent); - $this->eventStore->publish($storedEvent); + // Refresh the entity from the database, in case another process has + // updated it between reading it above and then acquiring the lock here. + // THIS IS CRITICAL TO AVOID DUPLICATE PUBLISHING OF EVENTS + $this->eventStore->refresh($storedEvent); + + // If the event is definitely still unpublished, whilst we are here holding + // the lock, then we can proceed to publish it. + if ($storedEvent->getPublishedOn() === null) { + $domainEvent = $this->serializer->deserialize( + $storedEvent->getEventBody(), + $storedEvent->getTypeName(), + 'json' + ); + assert($domainEvent instanceof DomainEvent); + + $this->domainEventDispatcher->dispatch($domainEvent); + $this->eventStore->publish($storedEvent); + } $lock->release(); }