Skip to content

Commit

Permalink
Merge pull request #6 from gowrizrh/cloud-sinks
Browse files Browse the repository at this point in the history
Add AWS SQS and GCP Pub/Sub sinks
  • Loading branch information
Vinai authored Jul 22, 2024
2 parents b898a4f + 63b9bd7 commit 876c522
Show file tree
Hide file tree
Showing 21 changed files with 1,619 additions and 97 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/split.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ jobs:
# - local_path: 'AsyncEventsAzure'
# split_repository: 'mageos-async-events-azure'

# - local_path: 'AsyncEventsGCP'
# split_repository: 'mageos-async-events-gcp'
- local_path: 'AsyncEventsGCP'
split_repository: 'mageos-async-events-gcp'

steps:
- uses: actions/checkout@v4
Expand Down
40 changes: 30 additions & 10 deletions MageOS/AsyncEventsAWS/Model/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@

class Config
{
const XML_PATH_AWS_ACCESS_KEY = 'async_events_aws/eventbridge/access_key';
const XML_PATH_AWS_SECRET_ACCESS_KEY = 'async_events_aws/eventbridge/secret_access_key';
const XML_PATH_AWS_REGION = 'async_events_aws/eventbridge/region';
const XML_PATH_EVENT_SOURCE = 'async_events_aws/eventbridge/source';
private const XML_PATH_AWS_ACCESS_KEY = 'async_events_aws/eventbridge/access_key';
private const XML_PATH_AWS_SECRET_ACCESS_KEY = 'async_events_aws/eventbridge/secret_access_key';
private const XML_PATH_AWS_REGION = 'async_events_aws/eventbridge/region';
private const XML_PATH_EVENT_SOURCE = 'async_events_aws/eventbridge/source';

/**
* Config constructor.
*
* @param StoreManagerInterface $storeManager
* @param ScopeConfigInterface $scopeConfig
*/
Expand All @@ -27,27 +25,49 @@ public function __construct(
) {
}

public function getAccessKey()
/**
* Get Access Key
*
* @return string|null
*/
public function getAccessKey(): ?string
{
return $this->scopeConfig->getValue(self::XML_PATH_AWS_ACCESS_KEY, ScopeInterface::SCOPE_STORES);
}

public function getSecretAccessKey()
/**
* Get Secret Access Key
*
* @return string|null
*/
public function getSecretAccessKey(): ?string
{
return $this->scopeConfig->getValue(self::XML_PATH_AWS_SECRET_ACCESS_KEY, ScopeInterface::SCOPE_STORES);
}

public function getRegion()
/**
* Get AWS region
*
* @return string|null
*/
public function getRegion(): ?string
{
return $this->scopeConfig->getValue(self::XML_PATH_AWS_REGION, ScopeInterface::SCOPE_STORES);
}

public function getSource()
/**
* Get source for EventBridge
*
* @return string|null
* @throws \Magento\Framework\Exception\NoSuchEntityException
*/
public function getSource(): ?string
{
$source = $this->scopeConfig->getValue(self::XML_PATH_EVENT_SOURCE, ScopeInterface::SCOPE_STORES);
if ($source == null) {
$url = $this->storeManager->getStore()->getBaseUrl();
if ($url !== null) {
// phpcs:disable Magento2.Functions.DiscouragedFunction.Discouraged
return parse_url($url, PHP_URL_HOST);
}
}
Expand Down
52 changes: 52 additions & 0 deletions MageOS/AsyncEventsAWS/Model/SQSConfig.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

declare(strict_types=1);

namespace MageOS\AsyncEventsAWS\Model;

use Magento\Framework\App\Config\ScopeConfigInterface;

class SQSConfig
{
private const XML_PATH_AWS_ACCESS_KEY = 'async_events_aws/sqs/access_key';
private const XML_PATH_AWS_SECRET_ACCESS_KEY = 'async_events_aws/sqs/secret_access_key';
private const XML_PATH_AWS_REGION = 'async_events_aws/sqs/region';

/**
* @param ScopeConfigInterface $scopeConfig
*/
public function __construct(
private readonly ScopeConfigInterface $scopeConfig
) {
}

/**
* Get Access Key
*
* @return string|null
*/
public function getAccessKey(): ?string
{
return $this->scopeConfig->getValue(self::XML_PATH_AWS_ACCESS_KEY);
}

/**
* Get Secret Access Key
*
* @return string|null
*/
public function getSecretAccessKey(): ?string
{
return $this->scopeConfig->getValue(self::XML_PATH_AWS_SECRET_ACCESS_KEY);
}

/**
* Get AWS region
*
* @return string|null
*/
public function getRegion(): ?string
{
return $this->scopeConfig->getValue(self::XML_PATH_AWS_REGION);
}
}
46 changes: 38 additions & 8 deletions MageOS/AsyncEventsAWS/README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
# MageOS Async Events AWS

AWS destinations for [mageos-async-events](https://github.com/mage-os/mageos-async-events)
AWS event sinks for [mageos-async-events](https://github.com/mage-os/mageos-async-events)

## Installation

```sh
composer require mage-os/mageos-async-events-aws
```

## Supported AWS event sinks
## AWS event sinks

* EventBridge: send events to an Amazon EventBridge bus
### Amazon EventBridge

### Configure AWS Credentials
**Configure AWS Credentials**

An IAM role with the `events:PutEvents` action is required so that the notifier can relay events into Amazon
EventBridge.

Under `Stores -> Services -> Async Events AWS` set the `Access Key ID` and the `Secret Access Key` and the `Region`. You
Under `Stores -> Services -> Async Events AWS` set the `Access Key` and the `Secret Access Key` and the `Region`. You
can also choose to configure the source of the event.

![AWS Config](./docs/config.png)

### Create a Subscription
**Create an EventBridge Subscription**

The following is an example to create an EventBridge subscription for the `example.event`

The following is an example to create an EventBridge subscription for the `example.event`
```shell
curl --location --request POST 'https://test.mageos.dev/rest/V1/async_event' \
--header 'Authorization: Bearer TOKEN' \
Expand All @@ -39,7 +40,36 @@ curl --location --request POST 'https://test.mageos.dev/rest/V1/async_event' \
}'
```

### Amazon Simple Queue Service

**Configure AWS Credentials**

An IAM role with the `sqs:SendMessage` action is required so that the notifier can relay events into Amazon
SQS.

Under `Stores -> Services -> Async Events AWS` set the `Access Key` and the `Secret Access Key` and the `Region`.

> [!NOTE]
> The maximum message size for SQS is 262,144 bytes (256 KiB)
**Create an SQS Subscription**

```shell
curl --location --request POST 'https://test.mageos.dev/rest/V1/async_event' \
--header 'Authorization: Bearer TOKEN' \
--header 'Content-Type: application/json' \
--data-raw '{
"asyncEvent": {
"event_name": "example.event",
"recipient_url": "Amazon SQS queue URL",
"verification_token": "supersecret",
"metadata": "sqs"
}
}'
```

## Contributing

This is a repository for distribution only.
Contributions are welcome on the development repository [mageos-async-events-sinks](https://github.com/mage-os/mageos-async-events-sinks)
Contributions are welcome on the development
repository [mageos-async-events-sinks](https://github.com/mage-os/mageos-async-events-sinks)
2 changes: 1 addition & 1 deletion MageOS/AsyncEventsAWS/Service/EventBridge.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

class EventBridge implements NotifierInterface
{
/** @var EventBridgeClient|null */
private ?EventBridgeClient $eventBridgeClient = null;

/**
Expand All @@ -31,7 +32,6 @@ public function __construct(
private readonly Normalizer $normalizer,
private readonly EncryptorInterface $encryptor,
private readonly SerializerInterface $serializer,

) {
}

Expand Down
129 changes: 129 additions & 0 deletions MageOS/AsyncEventsAWS/Service/SQS.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?php

declare(strict_types=1);

namespace MageOS\AsyncEventsAWS\Service;

use Aws\Exception\AwsException;
use Aws\Sqs\SqsClient;
use CloudEvents\Serializers\JsonSerializer;
use CloudEvents\Serializers\Normalizers\V1\Normalizer;
use CloudEvents\V1\CloudEventImmutable;
use Exception;
use Magento\Framework\Encryption\EncryptorInterface;
use Magento\Framework\Serialize\SerializerInterface;
use MageOS\AsyncEvents\Api\Data\AsyncEventInterface;
use MageOS\AsyncEvents\Api\Data\ResultInterface;
use MageOS\AsyncEvents\Helper\NotifierResult;
use MageOS\AsyncEvents\Helper\NotifierResultFactory;
use MageOS\AsyncEvents\Service\AsyncEvent\NotifierInterface;
use MageOS\AsyncEventsAWS\Model\SQSConfig;

class SQS implements NotifierInterface
{
/** @var SqsClient|null */
private ?SqsClient $sqsClient = null;

/**
* @param NotifierResultFactory $notifierResultFactory
* @param EncryptorInterface $encryptor
* @param SerializerInterface $serializer
* @param Normalizer $normalizer
* @param SQSConfig $config
*/
public function __construct(
private readonly NotifierResultFactory $notifierResultFactory,
private readonly EncryptorInterface $encryptor,
private readonly SerializerInterface $serializer,
private readonly Normalizer $normalizer,
private readonly SQSConfig $config
) {
}

/**
* @inheritDoc
*/
public function notify(AsyncEventInterface $asyncEvent, CloudEventImmutable $event): ResultInterface
{
/** @var NotifierResult $result */
$result = $this->notifierResultFactory->create();
$result->setSubscriptionId($asyncEvent->getSubscriptionId());
$result->setAsyncEventData($this->normalizer->normalize($event, false));
$result->setIsSuccessful(false);
$result->setIsRetryable(false);

$params = [
'MessageBody' => JsonSerializer::create()->serializeStructured($event),
'QueueUrl' => $asyncEvent->getRecipientUrl()
];

try {
$client = $this->getClient();

if (!$client) {
$result->setResponseData('SQS connection is not configured.');

return $result;
}

$sqsResult = $client->sendMessage($params);
$result->setResponseData(
$this->serializer->serialize([
'MessageId' => $sqsResult->get('MessageId'),
])
);
$result->setIsSuccessful(true);

} catch (AwsException $awsException) {
$code = $awsException->getAwsErrorCode();

// The PHP SDK doesn't throw named exceptions based on error codes, however the official go sdk maps the
// service error codes to the internal SQS error codes.
// https://github.com/aws/aws-sdk-go/blob/main/service/sqs/errors.go
$retryable = match ($code) {
'RequestThrottled', 'KmsDisabled', 'KmsThrottled' => true,
default => false,
};
$result->setIsRetryable($retryable);

$result->setResponseData($this->serializer->serialize([
'code' => $awsException->getAwsErrorCode(),
'message' => $awsException->getMessage()
]));

} catch (Exception $exception) {
$result->setResponseData(
$exception->getMessage()
);
}

return $result;
}

/**
* Instantiate and return SqsClient
*
* @return SqsClient|null
*/
private function getClient(): ?SqsClient
{
if ($this->sqsClient === null) {
$region = $this->config->getRegion();
$key = $this->config->getAccessKey();
$secret = $this->config->getSecretAccessKey();

if ($key !== null && $secret !== null) {

$this->sqsClient = new SqsClient([
'region' => $region,
'credentials' => [
'key' => $key,
'secret' => $this->encryptor->decrypt($secret)
]
]);
}
}

return $this->sqsClient;
}
}
3 changes: 2 additions & 1 deletion MageOS/AsyncEventsAWS/composer.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"name": "mage-os/mageos-async-events-aws",
"description": "AWS event sinks for mage-os/mageos-async-events",
"type": "magento2-module",
"require": {
"php": ">=8.1",
"aws/aws-sdk-php": "^3.305",
"aws/aws-sdk-php": "^3.0",
"mage-os/mageos-async-events": "^4.0"
},
"license": [
Expand Down
Loading

0 comments on commit 876c522

Please sign in to comment.