Skip to content

Commit

Permalink
full list of changes below:
Browse files Browse the repository at this point in the history
- do not commit any changes after command has failed
- try to cleanup transaction from (possibly) corrupted aggregates
  • Loading branch information
alanbem committed Sep 23, 2019
1 parent 68c4446 commit 75b05e8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 7 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"php": ">=7.1",
"ext-PDO": "^7.1",
"ext-igbinary": "^2.0|^3.0",
"ext-redis": "^4.0",
"ext-redis": "^4.0|^5.0",
"doctrine/dbal": "^2.8",
"psr/log": "~1.0",
"ramsey/uuid": "^3.7"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Streak\Infrastructure\CommandHandler;

use Streak\Application;
use Streak\Domain\Event\Producer;
use Streak\Infrastructure;

/**
Expand All @@ -39,8 +40,42 @@ public function __construct(Application\CommandHandler $handler, Infrastructure\

public function handle(Application\Command $command) : void
{
$this->handler->handle($command);
$producersInTransactionBeforeCommand = $this->uow->uncommitted();
try {
$this->handler->handle($command);
iterator_to_array($this->uow->commit());
} catch (\Throwable $e) {
$producersInTransactionAfterCommand = $this->uow->uncommitted();
$producersAddedToTransactionWithinCommand = $this->findProducersAddedWhileHandlingCommand($producersInTransactionBeforeCommand, $producersInTransactionAfterCommand);

iterator_to_array($this->uow->commit());
foreach ($producersAddedToTransactionWithinCommand as $producer) {
$this->uow->remove($producer);
}

throw $e;
}
}

/**
* Basically finds producers that are present in $producersInTransactionBeforeCommand, but not in $producersInTransactionAfterCommand.
*
* @param Producer[] $producersInTransactionBeforeCommand
* @param Producer[] $producersInTransactionAfterCommand
*
* @return Producer[]
*/
private function findProducersAddedWhileHandlingCommand(array $producersInTransactionBeforeCommand, array $producersInTransactionAfterCommand) : array
{
$producersAddedToTransactionWithinCommand = [];
foreach ($producersInTransactionAfterCommand as $producerFromAfterCommandHandled) {
foreach ($producersInTransactionBeforeCommand as $producerFromBeforeCommandHandled) {
if ($producerFromAfterCommandHandled->producerId()->equals($producerFromBeforeCommandHandled->producerId())) {
continue 2; // next producer
}
}
$producersAddedToTransactionWithinCommand[] = $producerFromAfterCommandHandled;
}

return $producersAddedToTransactionWithinCommand;
}
}
4 changes: 2 additions & 2 deletions tests/Domain/Event/Listener/CommandingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public function testReplayingEmptyStream()
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
new Command1(),
new Command3()
[new Command1()],
[new Command3()]
)
;
$commander = new CommandingStub($this->bus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ public function setUp()
$this->aggregateRoot1 = $this->getMockBuilder(Event\Sourced\AggregateRoot::class)->getMockForAbstractClass();
$this->aggregateRoot2 = $this->getMockBuilder(Event\Sourced\AggregateRoot::class)->getMockForAbstractClass();

$this->aggregateRootId1 = $this->getMockBuilder(Domain\AggregateRoot\Id::class)->getMockForAbstractClass();
$this->aggregateRootId2 = $this->getMockBuilder(Domain\AggregateRoot\Id::class)->getMockForAbstractClass();
$this->aggregateRootId1 = new Infrastructure\CommandHandler\TransactionalPersistenceCommandHandlerTest\ProducerId('6448c6b7-bd5d-4a04-97a9-c1ad99008c04');
$this->aggregateRootId2 = new Infrastructure\CommandHandler\TransactionalPersistenceCommandHandlerTest\ProducerId('9535ad9e-58c9-4bb6-82cf-843ae04f8f48');

$this->aggregateRoot1->expects($this->any())->method('producerId')->willReturn($this->aggregateRootId1);
$this->aggregateRoot2->expects($this->any())->method('producerId')->willReturn($this->aggregateRootId2);

$this->producer1 = $this->getMockBuilder(Event\Producer::class)->getMockForAbstractClass();
$this->producer2 = $this->getMockBuilder(Event\Producer::class)->getMockForAbstractClass();
Expand Down Expand Up @@ -117,10 +120,55 @@ public function testHandlingCommand()
$handler->handle($this->command);
}

public function testException()
{
$exception = new \Exception();
$this->expectExceptionObject($exception);

$handler = new TransactionalPersistenceCommandHandler($this->handler, $this->uow);

$this->handler
->expects($this->once())
->method('handle')
->with($this->command)
->willThrowException($exception);

$this->uow
->expects($this->never())
->method('commit')
;

$this->uow
->expects($this->exactly(2))
->method('uncommitted')
->willReturnOnConsecutiveCalls(
[$this->aggregateRoot1],
[$this->aggregateRoot1, $this->aggregateRoot2]
)
;

$this->uow
->expects($this->once())
->method('remove')
->with($this->aggregateRoot2)
;

$handler->handle($this->command);
}

public function committed(Event\Producer ...$producers) : \Generator
{
foreach ($producers as $producer) {
yield $producer;
}
}
}

namespace Streak\Infrastructure\CommandHandler\TransactionalPersistenceCommandHandlerTest;

use Streak\Domain\AggregateRoot;
use Streak\Domain\Id\UUID;

class ProducerId extends UUID implements AggregateRoot\Id
{
}

0 comments on commit 75b05e8

Please sign in to comment.