Skip to content

Commit

Permalink
Refresh StoredEvent object and confirm still unpublished (#36)
Browse files Browse the repository at this point in the history
In extremely high concurrency situations, it was possible for the
entities loaded in `PublishDomainEventSubscriber::publishEvents()` to be
published by another process, before the subsequent lock was acquired.

This commit enables a Doctrine "refresh" of the entity, inside the lock,
which then allows us to perform a reliable lookup of the entity's
published status, and then only publish if it is still unpublished.
  • Loading branch information
benr77 authored Jun 2, 2024
1 parent 2bc754f commit dddcf07
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 42 deletions.
28 changes: 5 additions & 23 deletions src/Doctrine/DoctrineEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ public function publish(StoredEvent $storedEvent): void
*/
public function allUnpublished(): array
{
try
{
try {
if (false === $this->em->getConnection()->createSchemaManager()->tablesExist([$this->tableName])) {
return []; // Connection does exist, but the events table does not exist.
}
}
catch (ConnectionException $connectionException)
{
} catch (ConnectionException $connectionException) {
return []; // Connection itself does not exist
}

Expand All @@ -131,23 +128,8 @@ public function allUnpublished(): array
return $qb->getQuery()->getResult();
}

/*
* @return StoredEvent[]|ArrayCollection
*/
/*public function allStoredEventsSince($eventId): ArrayCollection
public function refresh(StoredEvent $storedEvent): void
{
$qb = $this->em->createQueryBuilder()
->select('e')
->from(StoredEvent::class, 'e')
->orderBy('e.eventId');
if ($eventId)
{
$qb
->where('se.eventId > :event_id')
->setParameter('event_id', $eventId);
}
return $qb->getQuery()->getResult();
}*/
$this->em->refresh($storedEvent);
}
}
10 changes: 2 additions & 8 deletions src/Domain/Model/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

namespace Headsnet\DomainEventsBundle\Domain\Model;

use Doctrine\Common\Collections\ArrayCollection;

interface EventStore
{
public function nextIdentity(): EventId;
Expand All @@ -22,16 +20,12 @@ public function append(DomainEvent $domainEvent): void;

public function replace(DomainEvent $domainEvent): void;

public function publish(StoredEvent $domainEvent): void;
public function publish(StoredEvent $storedEvent): void;

/**
* @return StoredEvent[]
*/
public function allUnpublished(): array;

/*
* @param $eventId
* @return StoredEvent[]|ArrayCollection
*/
//public function allStoredEventsSince($eventId): ArrayCollection;
public function refresh(StoredEvent $storedEvent): void;
}
4 changes: 2 additions & 2 deletions src/Domain/Model/StoredEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class StoredEvent
private $occurredOn;

/**
* @var \DateTimeImmutable
* @var \DateTimeImmutable|null
*/
private $publishedOn;

Expand Down Expand Up @@ -68,7 +68,7 @@ public function getOccurredOn(): \DateTimeImmutable
return $this->occurredOn;
}

public function getPublishedOn(): \DateTimeImmutable
public function getPublishedOn(): ?\DateTimeImmutable
{
return $this->publishedOn;
}
Expand Down
27 changes: 18 additions & 9 deletions src/EventSubscriber/PublishDomainEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,24 @@ private function publishEvent(StoredEvent $storedEvent): void
);

if ($lock->acquire()) {
$domainEvent = $this->serializer->deserialize(
$storedEvent->getEventBody(),
$storedEvent->getTypeName(),
'json'
);
assert($domainEvent instanceof DomainEvent);

$this->domainEventDispatcher->dispatch($domainEvent);
$this->eventStore->publish($storedEvent);
// Refresh the entity from the database, in case another process has
// updated it between reading it above and then acquiring the lock here.
// THIS IS CRITICAL TO AVOID DUPLICATE PUBLISHING OF EVENTS
$this->eventStore->refresh($storedEvent);

// If the event is definitely still unpublished, whilst we are here holding
// the lock, then we can proceed to publish it.
if ($storedEvent->getPublishedOn() === null) {
$domainEvent = $this->serializer->deserialize(
$storedEvent->getEventBody(),
$storedEvent->getTypeName(),
'json'
);
assert($domainEvent instanceof DomainEvent);

$this->domainEventDispatcher->dispatch($domainEvent);
$this->eventStore->publish($storedEvent);
}

$lock->release();
}
Expand Down

0 comments on commit dddcf07

Please sign in to comment.