Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
Cleanup and prepare for release (#2)
Browse files Browse the repository at this point in the history
* Clean up events

* Added changelog and metadata
  • Loading branch information
Nyholm authored Jun 28, 2019
1 parent 5260394 commit 9f9e361
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 79 deletions.
16 changes: 16 additions & 0 deletions .php_cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

$finder = PhpCsFixer\Finder::create()
->in(__DIR__.'/src')
;

return PhpCsFixer\Config::create()
->setRiskyAllowed(true)
->setRules([
'@Symfony' => true,
'no_superfluous_phpdoc_tags' => true,
'final_class' => true,
'array_syntax' => ['syntax' => 'short'],
])
->setFinder($finder)
;
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Change Log

## 0.2.0

- Initial release
3 changes: 2 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ SNS messages and gives them to `App\Consumer\SnsConsumer`.

#### src/Consumer.php

This class is responsible to decode the SNS message and put it back on the message bus.
This class is responsible to decode the SNS message and put it back on the message bus. It dispatches events which one can
subscribe to in order to modify behavior. Ie, retry failed events.

#### config/sns-consumer.yaml

Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
],
"require": {
"enqueue/sns": "^0.9.12",
"sroze/messenger-enqueue-transport": "^0.3.0"
"sroze/messenger-enqueue-transport": "^0.3.0",
"symfony/event-dispatcher-contracts": "^1.1"
},
"autoload": {
"psr-4": {
Expand Down
19 changes: 9 additions & 10 deletions config/sns-consumer.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
services:
Bref\MessengerSns\Consumer:
public: true
arguments:
- '@messenger.routable_message_bus'
- '@messenger.default_serializer'
- 'sns' # same as framework.messenger.transports.*sns*
- '@logger'
- '@event_dispatcher'

framework:
messenger:
transports:
Expand All @@ -25,3 +15,12 @@ enqueue:
region: '%env(AWS_TARGET_REGION)%'
client: ~

services:
Bref\MessengerSns\Consumer:
public: true
arguments:
- '@messenger.routable_message_bus'
- '@messenger.default_serializer'
- 'sns' # same as framework.messenger.transports.*sns*
- '@event_dispatcher'
- '@logger'
35 changes: 22 additions & 13 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

use Bref\MessengerSns\Event\SnsMessageDecodeFailed;
use Bref\MessengerSns\Event\SnsMessageFailed;
use Bref\MessengerSns\Event\SnsMessageHandled;
use Bref\MessengerSns\Event\SnsMessageReceived;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -22,33 +23,38 @@ final class Consumer
private $bus;
private $serializer;
private $transportName;
private $logger;
private $eventDispatcher;
private $logger;

public function __construct(
MessageBusInterface $bus,
SerializerInterface $serializer,
string $transportName = 'sns',
LoggerInterface $logger = null,
EventDispatcherInterface $eventDispatcher = null
EventDispatcherInterface $eventDispatcher = null,
LoggerInterface $logger = null
) {

$this->bus = $bus;
$this->serializer = $serializer;
$this->transportName = $transportName;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->logger = $logger;
}

public function consume(array $snsEvent)
{
$sfEvent = new SnsMessageReceived($snsEvent, $this->transportName);
$this->dispatchEvent($sfEvent);
if (!$sfEvent->shouldHandle()) {
return;
}

try {
$envelope = $this->serializer->decode(['body' => $snsEvent['Message']]);
} catch (\Throwable $e) {
$this->dispatchEvent(new SnsMessageDecodeFailed($snsEvent, $e, $this->transportName));
}

$sfEvent = new SnsMessageReceived($snsEvent, $envelope, $this->transportName);
$sfEvent = new WorkerMessageReceivedEvent($envelope, $this->transportName);
$this->dispatchEvent($sfEvent);

if (!$sfEvent->shouldHandle()) {
Expand All @@ -58,17 +64,20 @@ public function consume(array $snsEvent)
try {
$this->bus->dispatch($envelope->with(new ReceivedStamp($this->transportName)));
} catch (\Throwable $e) {
$this->logger->critical('Could not consume message from SNS.', [
'exception' => $e,
'category' => 'sns',
]);
if ($this->logger !== null) {
$this->logger->critical('Could not consume message from SNS.', [
'exception' => $e,
'category' => 'sns',
]);
}

$this->dispatchEvent(new SnsMessageFailed($snsEvent, $envelope, $this->transportName, $e, /* $willRetry */ false));
$this->dispatchEvent(new SnsMessageFailed($snsEvent, $envelope, $this->transportName, $e));

// Start handle next event.
return;
}

$this->dispatchEvent(new SnsMessageHandled($snsEvent, $envelope, $this->transportName));
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->transportName));
}

private function dispatchEvent($event)
Expand Down
18 changes: 12 additions & 6 deletions src/Event/SnsMessageDecodeFailed.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

namespace Bref\MessengerSns\Event;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;

class SnsMessageDecodeFailed
/**
* Thrown when a SNS event could not be decoded.
*
* @author Tobias Nyholm <[email protected]>
*/
final class SnsMessageDecodeFailed
{
use SnsMessageTrait;
private $snsEvent;
private $throwable;
private $receiverName;

Expand All @@ -21,6 +22,11 @@ public function __construct(array $snsEvent, \Throwable $throwable, string $rece
$this->receiverName = $receiverName;
}

public function getSnsEvent(): array
{
return $this->snsEvent;
}

public function getThrowable(): \Throwable
{
return $this->throwable;
Expand Down
39 changes: 33 additions & 6 deletions src/Event/SnsMessageFailed.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,44 @@
namespace Bref\MessengerSns\Event;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;

class SnsMessageFailed extends WorkerMessageFailedEvent
/**
* An SNS messages failed to be handled properly. Subscribers to this event should handle retry.
*
* @author Tobias Nyholm <[email protected]>
*/
final class SnsMessageFailed
{
use SnsMessageTrait;
private $snsEvent;
private $envelope;
private $receiverName;
private $throwable;

public function __construct(array $snsEvent, Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
public function __construct(array $snsEvent, Envelope $envelope, string $receiverName, \Throwable $error)
{
$this->snsEvent = $snsEvent;
$this->envelope = $envelope;
$this->receiverName = $receiverName;
$this->throwable = $error;
}

public function getSnsEvent(): array
{
return $this->snsEvent;
}

parent::__construct($envelope, $receiverName, $error, $willRetry);
public function getEnvelope(): Envelope
{
return $this->envelope;
}

public function getReceiverName(): string
{
return $this->receiverName;
}

public function getThrowable(): \Throwable
{
return $this->throwable;
}
}
20 changes: 0 additions & 20 deletions src/Event/SnsMessageHandled.php

This file was deleted.

37 changes: 30 additions & 7 deletions src/Event/SnsMessageReceived.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,39 @@

namespace Bref\MessengerSns\Event;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;

class SnsMessageReceived extends WorkerMessageReceivedEvent
/**
* A raw SNS message was received. Use this event to decide if we want to handle this SNS message or not.
*
* @author Tobias Nyholm <[email protected]>
*/
final class SnsMessageReceived
{
use SnsMessageTrait;
private $snsEvent;
private $receiverName;
private $shouldHandle = true;

public function __construct(array $snsEvent, Envelope $envelope, string $receiverName)
public function __construct(array $snsEvent, string $receiverName)
{
$this->receiverName = $receiverName;
$this->snsEvent = $snsEvent;
parent::__construct($envelope, $receiverName);
}

public function shouldHandle(bool $shouldHandle = null): bool
{
if (null !== $shouldHandle) {
$this->shouldHandle = $shouldHandle;
}

return $this->shouldHandle;
}

public function getSnsEvent(): array
{
return $this->snsEvent;
}

public function getReceiverName(): string
{
return $this->receiverName;
}
}
15 changes: 0 additions & 15 deletions src/Event/SnsMessageTrait.php

This file was deleted.

0 comments on commit 9f9e361

Please sign in to comment.