Skip to content

Commit

Permalink
Integrate SQS adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
papac committed Sep 22, 2023
1 parent 8ae5e89 commit 12a8302
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 52 deletions.
34 changes: 34 additions & 0 deletions src/Console/Command/GenerateDatabaseQueueCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Bow\Console\Command;

use Bow\Console\Color;
use Bow\Console\Generator;
use Bow\Support\Str;

class GenerateDatabaseQueueCommand extends AbstractCommand
{
/**
* Generate session
*
* @return void
*/
public function generate(): void
{
$create_at = date("YmdHis");
$filename = sprintf("Version%s%sTable", $create_at, ucfirst(Str::camel('DatabaseQueue')));

$generator = new Generator(
$this->setting->getMigrationDirectory(),
$filename
);

$generator->write('model/queue', [
'className' => $filename
]);

echo Color::green('Queue migration created.');
}
}
28 changes: 28 additions & 0 deletions src/Console/stubs/model/queue.stub
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

use Bow\Database\Migration\Migration;
use Bow\Database\Migration\SQLGenerator;

class {className} extends Migration
{
/**
* Up Migration
*/
public function up(): void
{
$this->create("queues", function (SQLGenerator $table) {
$table->addColumn('id', 'string', ['primary' => true]);
$table->addColumn('time', 'timestamp');
$table->addColumn('data', 'text');
$table->addColumn('ip', 'string');
});
}

/**
* Rollback migration
*/
public function rollback(): void
{
$this->dropIfExists("sessions");
}
}
4 changes: 1 addition & 3 deletions src/Queue/Adapters/BeanstalkdAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ public function size(?string $queue = null): int
* Queue a job
*
* @param ProducerService $producer
* @return QueueAdapter
* @return void
*/
public function push(ProducerService $producer): void
{
// TODO: should be removed
// $this->flush();
$queues = (array) cache("beanstalkd:queues");

if (!in_array($producer->getQueue(), $queues)) {
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Adapters/DatabaseAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public function configure(array $queue): DatabaseAdapter
* Get connexion
*
* @param string $name
* @return Pheanstalk
* @return void
*/
public function setWatch(string $name): void
{
Expand Down
87 changes: 47 additions & 40 deletions src/Queue/Adapters/SQSAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SQSAdapter extends QueueAdapter
*
* @var SqsClient
*/
private SqsClient $client;
private SqsClient $sqs;

/**
* The configuration array
Expand Down Expand Up @@ -59,15 +59,7 @@ public function configure(array $config): QueueAdapter

$this->config = $config;

$this->client = new SqsClient([
'profile' => 'default',
'region' => 'us-west-2',
'version' => '2012-11-05',
'credentials' => [
'key' => $config["key"],
'secret' => $config["secret"]
]
]);
$this->sqs = new SqsClient($config);

return $this;
}
Expand Down Expand Up @@ -105,6 +97,28 @@ public function setSleep(int $sleep): void
$this->sleep = $sleep;
}

/**
* Get the queue or return the default.
*
* @param ?string $queue
* @return string
*/
public function getQueue(?string $queue = null): string
{
return $queue ?: $this->queue;
}

/**
* Set the number of seconds to wait before retrying a job.
*
* @param int $retry
* @return void
*/
public function setRetries(int $tries)
{
$this->tries = $tries;
}

/**
* Push a job onto the queue.
*
Expand All @@ -121,12 +135,12 @@ public function push(ProducerService $producer): void
'StringValue' => get_class($producer)
],
],
'MessageBody' => $this->serializeProducer($producer),
'MessageBody' => base64_encode($this->serializeProducer($producer)),
'QueueUrl' => $this->config["url"]
];

try {
$result = $this->client->sendMessage($params);
$this->sqs->sendMessage($params);
} catch (AwsException $e) {
error_log($e->getMessage());
}
Expand All @@ -140,7 +154,7 @@ public function push(ProducerService $producer): void
*/
public function size(string $queue): int
{
$response = $this->client->getQueueAttributes([
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'AttributeNames' => ['ApproximateNumberOfMessages'],
]);
Expand All @@ -150,28 +164,6 @@ public function size(string $queue): int
return (int) $attributes['ApproximateNumberOfMessages'];
}

/**
* Get the queue or return the default.
*
* @param ?string $queue
* @return string
*/
public function getQueue(?string $queue = null): string
{
return $queue ?: $this->queue;
}

/**
* Set the number of seconds to wait before retrying a job.
*
* @param int $retry
* @return void
*/
public function setRetries(int $tries)
{
$this->tries = $tries;
}

/**
* Process the next job on the queue.
*
Expand All @@ -183,7 +175,7 @@ public function run(?string $queue = null): void
$this->sleep($this->sleep ?? 5);

try {
$result = $this->client->receiveMessage([
$result = $this->sqs->receiveMessage([
'AttributeNames' => ['SentTimestamp'],
'MaxNumberOfMessages' => 1,
'MessageAttributeNames' => ['All'],
Expand All @@ -196,23 +188,27 @@ public function run(?string $queue = null): void
return;
}
$message = $result->get('Messages')[0];
$producer = $this->unserializeProducer($message["MessageBody"]);
$producer = $this->unserializeProducer(base64_decode($message["Body"]));
$delay = $producer->getDelay();
call_user_func([$producer, "process"]);
$result = $this->client->deleteMessage([
$result = $this->sqs->deleteMessage([
'QueueUrl' => $this->config["url"],
'ReceiptHandle' => $message['ReceiptHandle']
]);
} catch (AwsException $e) {
error_log($e->getMessage());
app('logger')->error($e->getMessage(), $e->getTrace());
if (!isset($producer)) {
$this->sleep(1);
return;
}
if ($producer->jobShouldBeDelete()) {
$result = $this->client->deleteMessage([
$result = $this->sqs->deleteMessage([
'QueueUrl' => $this->config["url"],
'ReceiptHandle' => $message['ReceiptHandle']
]);
} else {
$result = $this->client->changeMessageVisibilityBatch([
$result = $this->sqs->changeMessageVisibilityBatch([
'QueueUrl' => $this->config["url"],
'Entries' => [
'Id' => $producer->getId(),
Expand All @@ -224,4 +220,15 @@ public function run(?string $queue = null): void
$this->sleep(1);
}
}

/**
* flush the queue.
*
* @param ?string $queue
* @return void
*/
public function flush(?string $queue = null): void
{

}
}
3 changes: 2 additions & 1 deletion tests/Config/stubs/app.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

return [
"root" => "",
"auto_csrf" => false
"auto_csrf" => false,
"env_file" => realpath(__DIR__ . "/../../Support/stubs/env.json"),
];
14 changes: 11 additions & 3 deletions tests/Config/stubs/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,22 @@
* The sqs connexion
*/
"sqs" => [
'url' => app_env('SQS_URL', 'https://sqs.ap-south-1.amazonaws.com/242848748621/messaging'),
'profile' => 'default',
'region' => 'ap-south-1',
'version' => 'latest',
'url' => app_env("AWS_SQS_URL"),
'bucket' => app_env('AWS_S3_BUCKET'),
'credentials' => [
'key' => app_env('AWS_KEY'),
'secret' => app_env('AWS_SECRET'),
],
],

/**
* The sqs connexion
*/
"database" => [
'table' => "queue_jobs",
'table' => "queues",
]
]
];
4 changes: 2 additions & 2 deletions tests/Config/stubs/storage.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
's3' => [
"driver" => "s3",
'credentials' => [
'key' => app_env('AWS_S3_KEY'),
'secret' => app_env('AWS_S3_SECRET'),
'key' => app_env('AWS_KEY'),
'secret' => app_env('AWS_SECRET'),
],
'bucket' => app_env('AWS_S3_BUCKET'),
'region' => 'us-east-1',
Expand Down
6 changes: 4 additions & 2 deletions tests/Queue/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Bow\Cache\Adapter\RedisAdapter;
use Bow\Cache\Cache;
use Bow\Cache\CacheConfiguration;
use Bow\Configuration\EnvConfiguration;
use Bow\Configuration\LoggerConfiguration;
use Bow\Database\Database;
use Bow\Database\DatabaseConfiguration;
Expand All @@ -27,6 +28,7 @@ public static function setUpBeforeClass(): void
LoggerConfiguration::class,
DatabaseConfiguration::class,
CacheConfiguration::class,
EnvConfiguration::class,
]);

$config = TestingConfiguration::getConfig();
Expand Down Expand Up @@ -113,8 +115,8 @@ public function test_push_service_adapter_with_model($connection)
public function getConnection(): array
{
return [
["beanstalkd"],
// ["sqs"],
// ["beanstalkd"],
["sqs"],
// ["redis"],
// ["rabbitmq"],
// ["database"]
Expand Down

0 comments on commit 12a8302

Please sign in to comment.