ZfrEbWorker is a simple abstraction around SQS, aims to simplify the creation of app in Elastic Beanstalk.
- PHP 7.1+
Installation of ZfrEbWorker is only officially supported using Composer:
php composer.phar require 'zfr/zfr-eb-worker:6.*'
You can learn more about how Elastic Beanstalk worker/CRON work here: http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html
Note that Elastic Beanstalk automatically delete the message from the queue if you return a 200. That's why this package does not have any "deleteMessage".
ZfrEbWorker expects that you registers the Aws\Sdk
class to the container of your choice. You are free to configure the SDK the way
you prefer, so ZfrEbWorker does not come with a default factory for that.
As an example, here is a simple ContainerInterop
compatible factory:
<?php
use Aws\Sdk as AwsSdk;
use Interop\Container\ContainerInterface;
use RuntimeException;
class AwsSdkFactory
{
/**
* @param ContainerInterface $container
* @return AwsSdk
*/
public function __invoke(ContainerInterface $container): AwsSdk
{
$config = $container->get('config');
if (!isset($config['aws'])) {
throw new RuntimeException('Key "aws" is missing');
}
return new AwsSdk($config['aws']);
}
}
Then register your factory (this example is using the Zend\ServiceManager style config):
use Aws\Sdk as AwsSdk;
return [
'dependencies' => [
'factories' => [
AwsSdk::class => AwsSdkFactory::class
]
]
];
Finally, modify your configuration to add the aws
key to your config file (then, follows the official AWS SDK documentation
to know all the possible keys):
return [
'aws' => [
'region' => 'us-east-1', // Replace by your region
'Sqs' => ['version' => '2012-11-05'], // Add all your other services
'credentials' => [
'key' => 'YOUR_USER_KEY',
'secret' => 'YOUR_SECRET_KEY'
]
]
];
First, make sure to configure the ZfrEbWorker library by adding this config:
'zfr_eb_worker' => [
'queues' => [
'first_queue' => 'https://sqs.us-east-1.amazon.com/foo',
'second_queue' => 'https://sqs.us-east-1.amazon.com/bar'
],
'messages' => [
'project.created' => SendCampaignListener::class,
'image.saved' => ProcessImageListener::class
]
The queues
is an associative array of queue name and queue URL hosted on AWS SQS, while messages
is an associative array that map
a message name to a specific listeners (each listener is just a standard middleware).
You should register the WorkerMiddleware
in your router to respond the "/internal/worker" path.
This middleware consumes the messages sent by Elastic Beanstalk worker environment and routes to the mapped listener. For example, in Zend Expressive:
use ZfrEbWorker\Middleware\WorkerMiddleware;
$app->post('/internal/worker', WorkerMiddleware::class);
Then, you should configure your Elastic Beanstalk worker environment to push messages to "/internal/worker" URL (this is the default URL configured if you use Zend Expressive). By default, ZfrEbWorker do additional security checks to ensure that the request is coming from localhost (as the daemon is installed on EC2 instances directly and push the messages locally):
You can push messages by injecting a MessageQueueInterface
object into your classes.
You can create a queue easily, pre-configured, by using a MessageQueueRepositoryInterface
instance. For instance, assuming
the following config:
'zfr_eb_worker' => [
'queues' => [
'first_queue' => 'https://sqs.us-east-1.amazon.com/foo'
]
]
You can use the repository to create the queue, configured with the URL:
$queue = $queueRepository->getMessageQueue('first_queue');
Once you have a configured queue, you can add one or more messages, then flush the queue. When flushing, the library will make sure to do as few call as possible to SQS (using optimized SQS batch API), and to multiple queues:
$queue->push(new Message('image.saved', ['image_id' => 123]));
$queue->push(new Message('imave.saved', ['image_id' => 456]));
// ...
$queue->flush();
The push
method accepts as a first argument a MessageInterface
object, which is a thin wrapper for both a message
name and payload. ZfrEbWorker provides a default Message
class.
You can also push delayed message (up to 600 seconds) by using the specialized DelayedMessage class:
Example usage:
$queue->push(new DelayedMessage('image.saved', ['image_id' => 123], 60));
Note: if you are using a FIFO queue, this won't have any effect. On FIFO queue, delay can only be applied globally for a queue.
Starting from version 6, ZfrEbWorker supports FIFO queues. You can provide custom group ID and deduplication ID in the third and fourth parameters, respectively:
$message = new Message('image.saved', ['image_id' => 123], 'group_id', 'deduplication_id');
$queue->push($message);
If you do not provide a group ID, a default one will be generated.
ZfrEbWorker will automatically dispatch the incoming request to the middleware specified for the given event. The message information is stored inside various request attributes, as shown below:
use ZfrEbWorker\Middleware\WorkerMiddleware;
class MyEventMiddleware
{
public function __invoke($request, $response, $out)
{
$queue = $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE);
$messageId = $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE);
$messagePayload = $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE);
$name = $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE);
}
}
Note: for a periodic task, only the
Middleware::MESSAGE_NAME_ATTRIBUTE
is available.
When ZfrEbWorker don't find a mapped middleware to handle a message, it throws a RuntimeException
, which makes Elastic
Beanstalk retry the message again later. However if you don't want to handle a specific message and don't want Elastic
Beanstalk to retry it later, you should map SilentFailingListener to the message, like that:
'zfr_eb_worker' => [
'messages' => [
'user.updated' => ZfrEbWorker\Listener\SilentFailingListener::class,
]
Elastic Beanstalk also supports periodic tasks through the usage of cron.yaml
file (more info).
ZfrEbWorker supports this use case in the same, unified way.
Simply redirect all your periodic tasks to the same "/internal/worker" route, and make sure that the task name you use is part of your config. For instance, here is a task called "image.backup" that will run every 12 hours:
version: 1
cron:
- name: "image.backup"
url: "/internal/worker"
schedule: "0 */12 * * *"
Then, in your ZfrEbWorker config, just configure it like any other messages:
'zfr_eb_worker' => [
'messages' => [
'image.backup' => ImageBackupListener::class,
]
Starting from version 3.3, ZfrEbWorker comes with Symfony CLI commands that allows:
- to easily push messages into a queue that respect the ZfrEbWorker format.
- to emulate the usage of native Elastic Beanstalk worker to fetch messages and execute them.
This local worker is only meant to be used in development. In production, you should use the native Elastic Beanstalk worker, which is much faster (retrieves up to 10 messages in one SQS call) and is built-in into the Elastic Beanstalk AMI (it is monitored...).
Before using those CLI commands, there are some things you need to setup, as described in following sections.
Make sure that you add those two dependencies in your project (typically, in the require-dev
section of your composer.json
file):
{
"require-dev": {
"symfony/console": "^3.0",
"guzzlehttp/guzzle": "^6.0"
}
}
ZfrEbWorker adds the WorkerCommand
and PublisherCommand
Symfony CLI command into the console
top-key of the config. If you are using this library
with a framework that already uses Symfony CLI, just add the ZfrEbWorker\Cli\WorkerCommand
and/or ZfrEbWorker\Cli\PublisherCommand
commands.
If you are using Zend\Expressive, here is a sample file (call it console.php
for instance) you can add into the public
folder,
alongside your index.php
file:
use Symfony\Component\Console\Application;
chdir(dirname(__DIR__));
require 'vendor/autoload.php';
/** @var \Interop\Container\ContainerInterface $container */
$container = require 'config/container.php';
$application = new Application('Application console');
$commands = $container->get('config')['console']['commands'];
foreach ($commands as $command) {
$application->add($container->get($command));
}
$application->run();
In order to allow the local worker to work, you'll need to add the sqs:GetQueueUrl
, sqs:ReceiveMessage
, sqs:DeleteMessage
and
sqs:SendMessage
permissions to the IAM user you are using locally.
For security reasons, we recommend you to have production and development queues, so that your development IAM user only have access to the development queue and cannot mess with the production queue.
This command allows to easily add messages into an Elastic Beanstalk worker.
Use the following command: php console.php eb-publisher --payload="foo=bar&bar=baz" --queue=my-queue --name=user.created
The payload
key supports an HTML-like query param, so if you want to add the following JSON:
{
"user": {
"first_name": "John",
"last_name": "Doe"
}
}
You can use the following payload: --payload='user[first_name]=John&user[last_name]=Doe'
This command allows to simulate the native Elastic Beanstalk worker.
You can now write the command php console.php eb-worker --server=http://localhost --queue=my-queue
. This code will automatically poll
the queue called my-queue
, and push messages to the URL indicated by the server
option with the /internal/worker
path added (as this
is the default ZfrEbWorker configuration).
Therefore, in this example, the local worker will make a POST request to http://localhost/internal/worker
whenever a new message is
added. The local worker behaves exactly the same way the native Elastic Beanstalk worker does, and adds all the same HTTP headers.