Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh StoredEvent object and confirm still unpublished #36

Merged
merged 1 commit into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions src/Doctrine/DoctrineEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ public function publish(StoredEvent $storedEvent): void
*/
public function allUnpublished(): array
{
try
{
try {
if (false === $this->em->getConnection()->createSchemaManager()->tablesExist([$this->tableName])) {
return []; // Connection does exist, but the events table does not exist.
}
}
catch (ConnectionException $connectionException)
{
} catch (ConnectionException $connectionException) {
return []; // Connection itself does not exist
}

Expand All @@ -131,23 +128,8 @@ public function allUnpublished(): array
return $qb->getQuery()->getResult();
}

/*
* @return StoredEvent[]|ArrayCollection
*/
/*public function allStoredEventsSince($eventId): ArrayCollection
public function refresh(StoredEvent $storedEvent): void
{
$qb = $this->em->createQueryBuilder()
->select('e')
->from(StoredEvent::class, 'e')
->orderBy('e.eventId');

if ($eventId)
{
$qb
->where('se.eventId > :event_id')
->setParameter('event_id', $eventId);
}

return $qb->getQuery()->getResult();
}*/
$this->em->refresh($storedEvent);
}
}
10 changes: 2 additions & 8 deletions src/Domain/Model/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

namespace Headsnet\DomainEventsBundle\Domain\Model;

use Doctrine\Common\Collections\ArrayCollection;

interface EventStore
{
public function nextIdentity(): EventId;
Expand All @@ -22,16 +20,12 @@ public function append(DomainEvent $domainEvent): void;

public function replace(DomainEvent $domainEvent): void;

public function publish(StoredEvent $domainEvent): void;
public function publish(StoredEvent $storedEvent): void;

/**
* @return StoredEvent[]
*/
public function allUnpublished(): array;

/*
* @param $eventId
* @return StoredEvent[]|ArrayCollection
*/
//public function allStoredEventsSince($eventId): ArrayCollection;
public function refresh(StoredEvent $storedEvent): void;
}
4 changes: 2 additions & 2 deletions src/Domain/Model/StoredEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class StoredEvent
private $occurredOn;

/**
* @var \DateTimeImmutable
* @var \DateTimeImmutable|null
*/
private $publishedOn;

Expand Down Expand Up @@ -68,7 +68,7 @@ public function getOccurredOn(): \DateTimeImmutable
return $this->occurredOn;
}

public function getPublishedOn(): \DateTimeImmutable
public function getPublishedOn(): ?\DateTimeImmutable
{
return $this->publishedOn;
}
Expand Down
27 changes: 18 additions & 9 deletions src/EventSubscriber/PublishDomainEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,24 @@ private function publishEvent(StoredEvent $storedEvent): void
);

if ($lock->acquire()) {
$domainEvent = $this->serializer->deserialize(
$storedEvent->getEventBody(),
$storedEvent->getTypeName(),
'json'
);
assert($domainEvent instanceof DomainEvent);

$this->domainEventDispatcher->dispatch($domainEvent);
$this->eventStore->publish($storedEvent);
// Refresh the entity from the database, in case another process has
// updated it between reading it above and then acquiring the lock here.
// THIS IS CRITICAL TO AVOID DUPLICATE PUBLISHING OF EVENTS
$this->eventStore->refresh($storedEvent);

// If the event is definitely still unpublished, whilst we are here holding
// the lock, then we can proceed to publish it.
if ($storedEvent->getPublishedOn() === null) {
$domainEvent = $this->serializer->deserialize(
$storedEvent->getEventBody(),
$storedEvent->getTypeName(),
'json'
);
assert($domainEvent instanceof DomainEvent);

$this->domainEventDispatcher->dispatch($domainEvent);
$this->eventStore->publish($storedEvent);
}

$lock->release();
}
Expand Down
Loading