diff --git a/src/Aggregate/AggregateEventEnvelope.php b/src/Aggregate/AggregateEventEnvelope.php new file mode 100644 index 0000000..640531c --- /dev/null +++ b/src/Aggregate/AggregateEventEnvelope.php @@ -0,0 +1,146 @@ +getMeta(GenericEvent::META_AGGREGATE_TYPE); + $aggregateId = $event->getMeta(GenericEvent::META_AGGREGATE_ID); + $aggregateVersion = $event->getMeta(GenericEvent::META_AGGREGATE_VERSION); + $metadata = $event->metadata(); + $rawPayload = $event->payload(); + $eventName = $event->messageName(); + + $event = $flavour->convertMessageReceivedFromNetwork($event, true); + + if($event instanceof MessageBag) { + $event = $event->get(MessageBag::MESSAGE); + } + + return new self( + $eventName, + $aggregateType, + $aggregateId, + $aggregateVersion, + $event, + $rawPayload, + $metadata + ); + } + + private function __construct( + string $eventName, + string $aggregateType, + string $aggregateId, + int $aggregateVersion, + object $event, + array $rawPayload, + array $metadata + ) { + $this->eventName = $eventName; + $this->aggregateType = $aggregateType; + $this->aggregateId = $aggregateId; + $this->aggregateVersion = $aggregateVersion; + $this->event = $event; + $this->rawPayload = $rawPayload; + $this->metadata = $metadata; + } + + /** + * @return string + */ + public function eventName(): string + { + return $this->eventName; + } + + /** + * @return mixed + */ + public function event() + { + return $this->event; + } + + /** + * @return string + */ + public function aggregateId(): string + { + return $this->aggregateId; + } + + /** + * @return string + */ + public function aggregateType(): string + { + return $this->aggregateType; + } + + /** + * @return int + */ + public function aggregateVersion(): int + { + return $this->aggregateVersion; + } + + /** + * @return array + */ + public function rawPayload(): array + { + return $this->rawPayload; + } + + /** + * @return array + */ + public function metadata(): array + { + return $this->metadata; + } +} diff --git a/src/EventEngine.php b/src/EventEngine.php index c1e844e..80d35a9 100644 --- a/src/EventEngine.php +++ b/src/EventEngine.php @@ -11,6 +11,7 @@ namespace EventEngine; +use EventEngine\Aggregate\AggregateEventEnvelope; use EventEngine\Aggregate\Exception\AggregateNotFound; use EventEngine\Aggregate\FlavouredAggregateRoot; use EventEngine\Aggregate\GenericAggregateRepository; @@ -34,6 +35,7 @@ use EventEngine\Messaging\GenericEvent; use EventEngine\Messaging\GenericSchemaMessageFactory; use EventEngine\Messaging\Message; +use EventEngine\Messaging\MessageBag; use EventEngine\Messaging\MessageDispatcher; use EventEngine\Messaging\MessageFactory; use EventEngine\Messaging\MessageFactoryAware; @@ -64,6 +66,7 @@ use EventEngine\Schema\Schema; use EventEngine\Schema\TypeSchema; use EventEngine\Schema\TypeSchemaMap; +use EventEngine\Util\MapIterator; use EventEngine\Util\MessageTuple; use EventEngine\Util\VariableType; use Psr\Container\ContainerInterface; @@ -1035,6 +1038,42 @@ public function loadAggregateState(string $aggregateType, string $aggregateId, i return $aggregate->currentState(); } + /** + * @param string $aggregateType + * @param string $aggregateId + * @param int $minVersion + * @param int|null $maxVersion + * @return \Iterator + */ + public function loadAggregateEvents( + string $aggregateType, + string $aggregateId, + int $minVersion = 1, + int $maxVersion = null + ): \Iterator + { + $this->assertBootstrapped(__METHOD__); + + if (! \array_key_exists($aggregateType, $this->aggregateDescriptions)) { + throw new InvalidArgumentException('Unknown aggregate type: ' . $aggregateType); + } + + $aggregateDesc = $this->aggregateDescriptions[$aggregateType]; + + $stream = $this->eventStore->loadAggregateEvents( + $aggregateDesc['aggregateStream'], + $aggregateType, + $aggregateId, + $minVersion, + $maxVersion + ); + + return new MapIterator($stream, function (GenericEvent $arEvent) { + return AggregateEventEnvelope::fromGenericEvent($arEvent, $this->flavour); + }); + } + + public function loadAggregateStateUntil(string $aggregateType, string $aggregateId, int $maxVersion = null) { $this->assertBootstrapped(__METHOD__); diff --git a/tests/EventEngineTestAbstract.php b/tests/EventEngineTestAbstract.php index 5991f52..11e064d 100644 --- a/tests/EventEngineTestAbstract.php +++ b/tests/EventEngineTestAbstract.php @@ -11,6 +11,7 @@ namespace EventEngineTest; +use EventEngine\Aggregate\AggregateEventEnvelope; use EventEngine\Commanding\CommandPreProcessor; use EventEngine\DocumentStore\DocumentStore; use EventEngine\DocumentStore\Exception\UnknownCollection; @@ -644,6 +645,108 @@ public function it_can_load_aggregate_state() $this->assertLoadedUserState($userState); } + /** + * @test + */ + public function it_can_load_aggregate_events() + { + $publishedEvents = []; + + $this->eventEngine->on(Event::USER_WAS_REGISTERED, function ($event) use (&$publishedEvents) { + $publishedEvents[] = $event; + }); + + $this->eventEngine->on(Event::USERNAME_WAS_CHANGED, function ($event) use (&$publishedEvents) { + $publishedEvents[] = $event; + }); + + $this->initializeEventEngine(); + $this->bootstrapEventEngine(); + + $userId = Uuid::uuid4()->toString(); + + $firstResult = $this->eventEngine->dispatch(Command::REGISTER_USER, [ + UserDescription::IDENTIFIER => $userId, + UserDescription::USERNAME => 'Alex', + UserDescription::EMAIL => 'contact@prooph.de', + ]); + + $secondResult = $this->eventEngine->dispatch(Command::CHANGE_USERNAME, [ + UserDescription::IDENTIFIER => $userId, + UserDescription::USERNAME => 'John', + ]); + + $userEvents = iterator_to_array($this->eventEngine->loadAggregateEvents('User', $userId)); + + self::assertCount(2, $userEvents); + self::assertCount(2, $publishedEvents); + + /** @var AggregateEventEnvelope $event */ + $event = $userEvents[1]; + self::assertEquals(Event::USERNAME_WAS_CHANGED, $event->eventName()); + self::assertEquals('User', $event->aggregateType()); + self::assertEquals($userId, $event->aggregateId()); + self::assertEquals(2, $event->aggregateVersion()); + self::assertEquals(get_class($publishedEvents[1]), get_class($event->event())); + self::assertEquals([ + UserDescription::IDENTIFIER => $userId, + 'oldName' => 'Alex', + 'newName' => 'John', + ], $event->rawPayload()); + self::assertArrayHasKey(GenericEvent::META_CAUSATION_ID, $event->metadata()); + } + + /** + * @test + */ + public function it_can_load_a_range_of_aggregate_events() + { + $publishedEvents = []; + + $this->eventEngine->on(Event::USER_WAS_REGISTERED, function ($event) use (&$publishedEvents) { + $publishedEvents[] = $event; + }); + + $this->eventEngine->on(Event::USERNAME_WAS_CHANGED, function ($event) use (&$publishedEvents) { + $publishedEvents[] = $event; + }); + + $this->initializeEventEngine(); + $this->bootstrapEventEngine(); + + $userId = Uuid::uuid4()->toString(); + + $firstResult = $this->eventEngine->dispatch(Command::REGISTER_USER, [ + UserDescription::IDENTIFIER => $userId, + UserDescription::USERNAME => 'Alex', + UserDescription::EMAIL => 'contact@prooph.de', + ]); + + $secondResult = $this->eventEngine->dispatch(Command::CHANGE_USERNAME, [ + UserDescription::IDENTIFIER => $userId, + UserDescription::USERNAME => 'John', + ]); + + $userEvents = iterator_to_array($this->eventEngine->loadAggregateEvents('User', $userId, 2, 3)); + + self::assertCount(1, $userEvents); + self::assertCount(2, $publishedEvents); + + /** @var AggregateEventEnvelope $event */ + $event = $userEvents[0]; + self::assertEquals(Event::USERNAME_WAS_CHANGED, $event->eventName()); + self::assertEquals('User', $event->aggregateType()); + self::assertEquals($userId, $event->aggregateId()); + self::assertEquals(2, $event->aggregateVersion()); + self::assertEquals(get_class($publishedEvents[1]), get_class($event->event())); + self::assertEquals([ + UserDescription::IDENTIFIER => $userId, + 'oldName' => 'Alex', + 'newName' => 'John', + ], $event->rawPayload()); + self::assertArrayHasKey(GenericEvent::META_CAUSATION_ID, $event->metadata()); + } + /** * @test */