Skip to content

Commit

Permalink
Merge pull request event-engine#18 from event-engine/feature/load_agg…
Browse files Browse the repository at this point in the history
…regate_events

Load aggregate events
  • Loading branch information
codeliner authored Sep 15, 2019
2 parents 54795d2 + 8fe9163 commit 56d84e6
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 0 deletions.
146 changes: 146 additions & 0 deletions src/Aggregate/AggregateEventEnvelope.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php
declare(strict_types=1);

namespace EventEngine\Aggregate;

use EventEngine\Messaging\GenericEvent;
use EventEngine\Messaging\MessageBag;
use EventEngine\Runtime\Flavour;

final class AggregateEventEnvelope
{
/**
* @var string
*/
private $eventName;

/**
* @var mixed
*/
private $event;

/**
* @var string
*/
private $aggregateId;

/**
* @var string
*/
private $aggregateType;

/**
* @var int
*/
private $aggregateVersion;

/**
* @var array
*/
private $rawPayload;

/**
* @var array
*/
private $metadata;

public static function fromGenericEvent(GenericEvent $event, Flavour $flavour): self
{
$aggregateType = $event->getMeta(GenericEvent::META_AGGREGATE_TYPE);
$aggregateId = $event->getMeta(GenericEvent::META_AGGREGATE_ID);
$aggregateVersion = $event->getMeta(GenericEvent::META_AGGREGATE_VERSION);
$metadata = $event->metadata();
$rawPayload = $event->payload();
$eventName = $event->messageName();

$event = $flavour->convertMessageReceivedFromNetwork($event, true);

if($event instanceof MessageBag) {
$event = $event->get(MessageBag::MESSAGE);
}

return new self(
$eventName,
$aggregateType,
$aggregateId,
$aggregateVersion,
$event,
$rawPayload,
$metadata
);
}

private function __construct(
string $eventName,
string $aggregateType,
string $aggregateId,
int $aggregateVersion,
object $event,
array $rawPayload,
array $metadata
) {
$this->eventName = $eventName;
$this->aggregateType = $aggregateType;
$this->aggregateId = $aggregateId;
$this->aggregateVersion = $aggregateVersion;
$this->event = $event;
$this->rawPayload = $rawPayload;
$this->metadata = $metadata;
}

/**
* @return string
*/
public function eventName(): string
{
return $this->eventName;
}

/**
* @return mixed
*/
public function event()
{
return $this->event;
}

/**
* @return string
*/
public function aggregateId(): string
{
return $this->aggregateId;
}

/**
* @return string
*/
public function aggregateType(): string
{
return $this->aggregateType;
}

/**
* @return int
*/
public function aggregateVersion(): int
{
return $this->aggregateVersion;
}

/**
* @return array
*/
public function rawPayload(): array
{
return $this->rawPayload;
}

/**
* @return array
*/
public function metadata(): array
{
return $this->metadata;
}
}
39 changes: 39 additions & 0 deletions src/EventEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace EventEngine;

use EventEngine\Aggregate\AggregateEventEnvelope;
use EventEngine\Aggregate\Exception\AggregateNotFound;
use EventEngine\Aggregate\FlavouredAggregateRoot;
use EventEngine\Aggregate\GenericAggregateRepository;
Expand All @@ -34,6 +35,7 @@
use EventEngine\Messaging\GenericEvent;
use EventEngine\Messaging\GenericSchemaMessageFactory;
use EventEngine\Messaging\Message;
use EventEngine\Messaging\MessageBag;
use EventEngine\Messaging\MessageDispatcher;
use EventEngine\Messaging\MessageFactory;
use EventEngine\Messaging\MessageFactoryAware;
Expand Down Expand Up @@ -64,6 +66,7 @@
use EventEngine\Schema\Schema;
use EventEngine\Schema\TypeSchema;
use EventEngine\Schema\TypeSchemaMap;
use EventEngine\Util\MapIterator;
use EventEngine\Util\MessageTuple;
use EventEngine\Util\VariableType;
use Psr\Container\ContainerInterface;
Expand Down Expand Up @@ -1035,6 +1038,42 @@ public function loadAggregateState(string $aggregateType, string $aggregateId, i
return $aggregate->currentState();
}

/**
* @param string $aggregateType
* @param string $aggregateId
* @param int $minVersion
* @param int|null $maxVersion
* @return \Iterator<AggregateEventEnvelope>
*/
public function loadAggregateEvents(
string $aggregateType,
string $aggregateId,
int $minVersion = 1,
int $maxVersion = null
): \Iterator
{
$this->assertBootstrapped(__METHOD__);

if (! \array_key_exists($aggregateType, $this->aggregateDescriptions)) {
throw new InvalidArgumentException('Unknown aggregate type: ' . $aggregateType);
}

$aggregateDesc = $this->aggregateDescriptions[$aggregateType];

$stream = $this->eventStore->loadAggregateEvents(
$aggregateDesc['aggregateStream'],
$aggregateType,
$aggregateId,
$minVersion,
$maxVersion
);

return new MapIterator($stream, function (GenericEvent $arEvent) {
return AggregateEventEnvelope::fromGenericEvent($arEvent, $this->flavour);
});
}


public function loadAggregateStateUntil(string $aggregateType, string $aggregateId, int $maxVersion = null)
{
$this->assertBootstrapped(__METHOD__);
Expand Down
103 changes: 103 additions & 0 deletions tests/EventEngineTestAbstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace EventEngineTest;

use EventEngine\Aggregate\AggregateEventEnvelope;
use EventEngine\Commanding\CommandPreProcessor;
use EventEngine\DocumentStore\DocumentStore;
use EventEngine\DocumentStore\Exception\UnknownCollection;
Expand Down Expand Up @@ -644,6 +645,108 @@ public function it_can_load_aggregate_state()
$this->assertLoadedUserState($userState);
}

/**
* @test
*/
public function it_can_load_aggregate_events()
{
$publishedEvents = [];

$this->eventEngine->on(Event::USER_WAS_REGISTERED, function ($event) use (&$publishedEvents) {
$publishedEvents[] = $event;
});

$this->eventEngine->on(Event::USERNAME_WAS_CHANGED, function ($event) use (&$publishedEvents) {
$publishedEvents[] = $event;
});

$this->initializeEventEngine();
$this->bootstrapEventEngine();

$userId = Uuid::uuid4()->toString();

$firstResult = $this->eventEngine->dispatch(Command::REGISTER_USER, [
UserDescription::IDENTIFIER => $userId,
UserDescription::USERNAME => 'Alex',
UserDescription::EMAIL => '[email protected]',
]);

$secondResult = $this->eventEngine->dispatch(Command::CHANGE_USERNAME, [
UserDescription::IDENTIFIER => $userId,
UserDescription::USERNAME => 'John',
]);

$userEvents = iterator_to_array($this->eventEngine->loadAggregateEvents('User', $userId));

self::assertCount(2, $userEvents);
self::assertCount(2, $publishedEvents);

/** @var AggregateEventEnvelope $event */
$event = $userEvents[1];
self::assertEquals(Event::USERNAME_WAS_CHANGED, $event->eventName());
self::assertEquals('User', $event->aggregateType());
self::assertEquals($userId, $event->aggregateId());
self::assertEquals(2, $event->aggregateVersion());
self::assertEquals(get_class($publishedEvents[1]), get_class($event->event()));
self::assertEquals([
UserDescription::IDENTIFIER => $userId,
'oldName' => 'Alex',
'newName' => 'John',
], $event->rawPayload());
self::assertArrayHasKey(GenericEvent::META_CAUSATION_ID, $event->metadata());
}

/**
* @test
*/
public function it_can_load_a_range_of_aggregate_events()
{
$publishedEvents = [];

$this->eventEngine->on(Event::USER_WAS_REGISTERED, function ($event) use (&$publishedEvents) {
$publishedEvents[] = $event;
});

$this->eventEngine->on(Event::USERNAME_WAS_CHANGED, function ($event) use (&$publishedEvents) {
$publishedEvents[] = $event;
});

$this->initializeEventEngine();
$this->bootstrapEventEngine();

$userId = Uuid::uuid4()->toString();

$firstResult = $this->eventEngine->dispatch(Command::REGISTER_USER, [
UserDescription::IDENTIFIER => $userId,
UserDescription::USERNAME => 'Alex',
UserDescription::EMAIL => '[email protected]',
]);

$secondResult = $this->eventEngine->dispatch(Command::CHANGE_USERNAME, [
UserDescription::IDENTIFIER => $userId,
UserDescription::USERNAME => 'John',
]);

$userEvents = iterator_to_array($this->eventEngine->loadAggregateEvents('User', $userId, 2, 3));

self::assertCount(1, $userEvents);
self::assertCount(2, $publishedEvents);

/** @var AggregateEventEnvelope $event */
$event = $userEvents[0];
self::assertEquals(Event::USERNAME_WAS_CHANGED, $event->eventName());
self::assertEquals('User', $event->aggregateType());
self::assertEquals($userId, $event->aggregateId());
self::assertEquals(2, $event->aggregateVersion());
self::assertEquals(get_class($publishedEvents[1]), get_class($event->event()));
self::assertEquals([
UserDescription::IDENTIFIER => $userId,
'oldName' => 'Alex',
'newName' => 'John',
], $event->rawPayload());
self::assertArrayHasKey(GenericEvent::META_CAUSATION_ID, $event->metadata());
}

/**
* @test
*/
Expand Down

0 comments on commit 56d84e6

Please sign in to comment.