Skip to content

Commit

Permalink
Support client side rate limiting (#20)
Browse files Browse the repository at this point in the history
* chore: add retry interfaces

* 🚧 chore: setup support client side rate limiting

* chore: add comments, check for delay seconds

* 💚 ci: update workflow file

* 🔧 chore: add changes to api workflows

* 🧪 actually run events retry test

the test had been silently failing in the background
because the queue consumer started did not have
the di preference overriden for Config to TestConfig.

because that always happened in a separate process, it
never came to light that we weren't actually asserting
that retries had been performed in the background.
  • Loading branch information
gowrizrh authored Mar 4, 2024
1 parent 1fa940a commit 5ea908b
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 44 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/api-functional-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:

# Only run api tests on feature branches. Simple documentation updates, formatting can be ignored
if: contains(github.head_ref, 'feature')
name: ${{ matrix.job_title }}
strategy:
fail-fast: true
matrix:
Expand All @@ -25,6 +26,8 @@ jobs:
redis: "redis:6.2"
varnish: "varnish:7.0"
nginx: "nginx:1.18"
job_title: "2.4.5"

- magento: magento/project-community-edition:>=2.4.5 <2.4.6
php: 8.1
composer: 2
Expand All @@ -34,6 +37,7 @@ jobs:
redis: "redis:6.2"
varnish: "varnish:7.0"
nginx: "nginx:1.18"
job_title: "2.4.6"

services:
elasticsearch:
Expand Down Expand Up @@ -68,7 +72,7 @@ jobs:
- 5672:5672
- 15672:15672
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set PHP Version
uses: shivammathur/setup-php@v2
with:
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:

# Only run integration tests on feature branches. Simple documentation updates, formatting can be ignored
if: contains(github.head_ref, 'feature')
name: ${{ matrix.job_title }}
strategy:
fail-fast: true
matrix:
Expand All @@ -25,6 +26,7 @@ jobs:
redis: "redis:6.2"
varnish: "varnish:7.0"
nginx: "nginx:1.18"
job_title: "2.4.5"
- magento: magento/project-community-edition:>=2.4.5 <2.4.6
php: 8.1
composer: 2
Expand All @@ -34,6 +36,7 @@ jobs:
redis: "redis:6.2"
varnish: "varnish:7.0"
nginx: "nginx:1.18"
job_title: "2.4.6"

services:
elasticsearch:
Expand Down Expand Up @@ -68,7 +71,7 @@ jobs:
- 5672:5672
- 15672:15672
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set PHP Version
uses: shivammathur/setup-php@v2
with:
Expand Down Expand Up @@ -128,6 +131,9 @@ jobs:
sed -i "s/'db-password' => '123123q'/'db-password' => 'password'/" etc/install-config-mysql.php.dist
sed -i "s/'elasticsearch-host' => 'localhost'/'elasticsearch-host' => '127.0.0.1'/" etc/install-config-mysql.php.dist
sed -i "s/'amqp-host' => 'localhost'/'amqp-host' => '127.0.0.1'/" etc/install-config-mysql.php.dist
sed -i "s/'consumers-wait-for-messages' => '0'/'consumers-wait-for-messages' => '1'/" etc/install-config-mysql.php.dist
mkdir etc/di/preferences/cli
cp ../../../vendor/mage-os/mageos-async-events/Test/_files/ce.php ./etc/di/preferences/cli
- run: ../../../vendor/bin/phpunit ../../../vendor/mage-os/mageos-async-events/Test/Integration
working-directory: ../magento2/dev/tests/integration
Expand Down
53 changes: 53 additions & 0 deletions Api/Data/ResultInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

declare(strict_types=1);

namespace MageOS\AsyncEvents\Api\Data;

interface ResultInterface
{
/**
* Getter for is_successful
*
* @return bool
*/
public function getIsSuccessful(): bool;

/**
* Setter for is_successful
*
* @param bool $isSuccessful
* @return void
*/
public function setIsSuccessful(bool $isSuccessful): void;

/**
* Getter for is_retryable
*
* @return bool
*/
public function getIsRetryable(): bool;

/**
* Setter for is_retryable
*
* @param bool $isRetryable
* @return void
*/
public function setIsRetryable(bool $isRetryable): void;

/**
* Getter for retry_after
*
* @return int|null
*/
public function getRetryAfter(): ?int;

/**
* Setter for retry_after
*
* @param int $retryAfter
* @return void
*/
public function setRetryAfter(int $retryAfter): void;
}
12 changes: 12 additions & 0 deletions Api/RetryManagementInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace MageOS\AsyncEvents\Api;

interface RetryManagementInterface
{
public function init(int $subscriptionId, mixed $data, string $uuid): void;

public function place(int $deathCount, int $subscriptionId, mixed $data, string $uuid, ?int $backoff): void;

public function kill(int $subscriptionId, mixed $data): void;
}
35 changes: 34 additions & 1 deletion Helper/NotifierResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
namespace MageOS\AsyncEvents\Helper;

use Magento\Framework\DataObject;
use MageOS\AsyncEvents\Api\Data\ResultInterface;

class NotifierResult extends DataObject
class NotifierResult extends DataObject implements ResultInterface
{
private const SUCCESS = 'success';
private const SUBSCRIPTION_ID = 'subscription_id';
private const RESPONSE_DATA = 'response_data';
private const UUID = 'uuid';
private const DATA = 'data';
private const IS_RETRYABLE = 'is_retryable';
private const RETRY_AFTER = 'retry_after';

/**
* Getter for success
Expand Down Expand Up @@ -118,4 +121,34 @@ public function setAsyncEventData(array $eventData): void
{
$this->setData(self::DATA, $eventData);
}

public function getIsSuccessful(): bool
{
return $this->getSuccess();
}

public function setIsSuccessful(bool $isSuccessful): void
{
$this->setSuccess($isSuccessful);
}

public function getIsRetryable(): bool
{
return (bool) $this->getData(self::IS_RETRYABLE);
}

public function setIsRetryable(bool $isRetryable): void
{
$this->setData(self::IS_RETRYABLE, $isRetryable);
}

public function getRetryAfter(): ?int
{
return $this->getData(self::RETRY_AFTER);
}

public function setRetryAfter(int $retryAfter): void
{
$this->setData(self::RETRY_AFTER, $retryAfter);
}
}
16 changes: 11 additions & 5 deletions Model/RetryHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,21 @@ public function process(array $message): void
foreach ($asyncEvents as $asyncEvent) {
$handler = $asyncEvent->getMetadata();
$notifier = $this->notifierFactory->create($handler);
$response = $notifier->notify($asyncEvent, [
$result = $notifier->notify($asyncEvent, [
'data' => $data
]);
$response->setUuid($uuid);
$this->log($response);
$result->setUuid($uuid);
$this->log($result);

if (!$response->getSuccess()) {
if (!$result->getIsSuccessful() && $result->getIsRetryable()) {
if ($deathCount < $maxDeaths) {
$this->retryManager->place($deathCount + 1, $subscriptionId, $data, $uuid);
$this->retryManager->place(
++$deathCount,
$subscriptionId,
$data,
$uuid,
$result->getRetryAfter()
);
} else {
$this->retryManager->kill($subscriptionId, $data);
}
Expand Down
8 changes: 4 additions & 4 deletions Service/AsyncEvent/EventDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ public function dispatch(string $eventName, mixed $output, int $storeId = 0): vo

$notifier = $this->notifierFactory->create($handler);

$response = $notifier->notify(
$result = $notifier->notify(
$asyncEvent,
[
'data' => $output
]
);

$uuid = $this->identityService->generateId();
$response->setUuid($uuid);
$result->setUuid($uuid);

$this->log($response);
$this->log($result);

if (!$response->getSuccess()) {
if (!$result->getIsSuccessful() && $result->getIsRetryable()) {
$this->retryManager->init($asyncEvent->getSubscriptionId(), $output, $uuid);
}
}
Expand Down
16 changes: 10 additions & 6 deletions Service/AsyncEvent/HttpNotifier.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,30 @@ public function notify(AsyncEventInterface $asyncEvent, array $data): NotifierRe
]
);

$notifierResult->setSuccess(
$response->getStatusCode() >= 200
&& $response->getStatusCode() < 300
);

$notifierResult->setIsSuccessful(true);
$notifierResult->setResponseData($response->getBody()->getContents());

} catch (RequestException $exception) {
/**
* Catch a RequestException, so we cover even the network layer exceptions which might sometimes
* not have a response.
*/
$notifierResult->setSuccess(false);
$notifierResult->setIsSuccessful(false);

if ($exception->hasResponse()) {
$response = $exception->getResponse();
$responseContent = $response->getBody()->getContents();
$exceptionMessage = !empty($responseContent) ? $responseContent : $response->getReasonPhrase();

$notifierResult->setResponseData($exceptionMessage);
$notifierResult->setIsRetryable(true);

if ($response->hasHeader('Retry-After')) {
$retryAfter = $response->getHeader('Retry-After')[0];
if (is_numeric($retryAfter)) {
$notifierResult->setRetryAfter((int) $retryAfter);
}
}
} else {
$notifierResult->setResponseData(
$exception->getMessage()
Expand Down
6 changes: 3 additions & 3 deletions Service/AsyncEvent/NotifierInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace MageOS\AsyncEvents\Service\AsyncEvent;

use MageOS\AsyncEvents\Api\Data\AsyncEventInterface;
use MageOS\AsyncEvents\Helper\NotifierResult;
use MageOS\AsyncEvents\Api\Data\ResultInterface;

interface NotifierInterface
{
Expand All @@ -14,7 +14,7 @@ interface NotifierInterface
*
* @param AsyncEventInterface $asyncEvent
* @param array $data
* @return NotifierResult
* @return ResultInterface
*/
public function notify(AsyncEventInterface $asyncEvent, array $data): NotifierResult;
public function notify(AsyncEventInterface $asyncEvent, array $data): ResultInterface;
}
12 changes: 9 additions & 3 deletions Service/AsyncEvent/RetryManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace MageOS\AsyncEvents\Service\AsyncEvent;

use MageOS\AsyncEvents\Api\RetryManagementInterface;
use MageOS\AsyncEvents\Helper\QueueMetadataInterface;
use Magento\Framework\Amqp\ConfigPool;
use Magento\Framework\Amqp\Topology\BindingInstallerInterface;
Expand All @@ -12,7 +13,7 @@
use Magento\Framework\MessageQueue\Topology\Config\QueueConfigItemFactory;
use Magento\Framework\Serialize\SerializerInterface;

class RetryManager
class RetryManager implements RetryManagementInterface
{
public const DEATH_COUNT = 'death_count';
public const SUBSCRIPTION_ID = 'subscription_id';
Expand Down Expand Up @@ -70,11 +71,15 @@ public function init(int $subscriptionId, mixed $data, string $uuid): void
* @param int $subscriptionId
* @param mixed $data
* @param string $uuid
* @param int|null $backoff
* @return void
*/
public function place(int $deathCount, int $subscriptionId, mixed $data, string $uuid): void
public function place(int $deathCount, int $subscriptionId, mixed $data, string $uuid, ?int $backoff): void
{
$backoff = $this->calculateBackoff($deathCount);
if (!$backoff) {
$backoff = $this->calculateBackoff($deathCount);
}

$queueName = 'event.delay.' . $backoff;
$retryRoutingKey = 'event.retry.' . $backoff;

Expand Down Expand Up @@ -120,6 +125,7 @@ public function kill(int $subscriptionId, mixed $data): void
private function assertDelayQueue(int $backoff, string $queueName, string $retryRoutingKey): void
{
$config = $this->configPool->get('amqp');
$backoff = abs($backoff);

$queueConfigItem = $this->queueConfigItemFactory->create();
$queueConfigItem->setData([
Expand Down
Loading

0 comments on commit 5ea908b

Please sign in to comment.