Skip to content

Commit

Permalink
Merge pull request #66 from Vinelab/enhancement/dlx
Browse files Browse the repository at this point in the history
Release v0.8.1
  • Loading branch information
KinaneD authored Oct 12, 2021
2 parents 7b1333e + d8341a6 commit 34db062
Show file tree
Hide file tree
Showing 23 changed files with 138 additions and 119 deletions.
3 changes: 2 additions & 1 deletion src/Ack.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Ack

/**
* Ack constructor.
*
* @param string $mode
* @param bool $requeue
* @param bool $multiple
Expand All @@ -53,4 +54,4 @@ public function __get($name)
return $this->$name;
}
}
}
}
6 changes: 3 additions & 3 deletions src/BowlerServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BowlerServiceProvider extends ServiceProvider
public function boot()
{
$this->publishes([
__DIR__.'/../config/bowler.php' => config_path('bowler.php'),
__DIR__ . '/../config/bowler.php' => config_path('bowler.php'),
]);
}

Expand All @@ -34,7 +34,7 @@ public function boot()
*/
public function register()
{
$this->mergeConfigFrom(dirname(__DIR__).'/config/bowler.php', 'bowler');
$this->mergeConfigFrom(dirname(__DIR__) . '/config/bowler.php', 'bowler');

// register facade to resolve instance
$this->app->singleton('vinelab.bowler.registrator', function ($app) {
Expand Down Expand Up @@ -74,7 +74,7 @@ public function register()

$this->app->when(BowlerExceptionHandler::class)
->needs(ExceptionHandler::class)
->give($this->app->getNamespace().'Exceptions\Handler');
->give($this->app->getNamespace() . 'Exceptions\Handler');

//register command
$this->commands([
Expand Down
22 changes: 12 additions & 10 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Vinelab\Bowler;

define('__ROOT__', dirname(dirname(dirname(__FILE__))));

//require_once(__ROOT__.'/vendor/autoload.php');

use PhpAmqpLib\Channel\AMQPChannel;
Expand Down Expand Up @@ -82,18 +83,19 @@ class Connection

/**
* RabbitMQ vhost.
*
* @var string
*/
private $vhost = '/';

/**
* @param string $host the ip of the rabbitmq server, default: localhost
* @param int $port. default: 5672
* @param string $username, default: guest
* @param string $password, default: guest
* @param int $connectionTimeout, default: 30
* @param int $readWriteTimeout, default: 30
* @param int $heartbeat, default: 15
* @param string $host the ip of the rabbitmq server, default: localhost
* @param int $port . default: 5672
* @param string $username , default: guest
* @param string $password , default: guest
* @param int $connectionTimeout , default: 30
* @param int $readWriteTimeout , default: 30
* @param int $heartbeat , default: 15
*/
public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest', $connectionTimeout = 30, $readWriteTimeout = 30, $heartbeat = 15, $vhost = '/')
{
Expand Down Expand Up @@ -151,8 +153,8 @@ public function getChannel()
/**
* Fetch the list of consumers details for the given queue name using the management API.
*
* @param string $queueName
* @param string $columns
* @param string $queueName
* @param string $columns
*
* @return array
*/
Expand All @@ -161,7 +163,7 @@ public function fetchQueueConsumers($queueName, string $columns = 'consumer_deta
$http = app(HTTPClient::class);

$request = [
'url' => $this->host.':'.$this->managementPort.'/api/queues/%2F/'.$queueName,
'url' => $this->host . ':' . $this->managementPort . '/api/queues/%2F/' . $queueName,
'params' => ['columns' => $columns],
'auth' => [
'username' => $this->username,
Expand Down
30 changes: 18 additions & 12 deletions src/Console/Commands/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ConsumeCommand extends Command

/**
* ConsumeCommand constructor.
*
* @param RegisterQueues $registrator
*/
public function __construct(RegisterQueues $registrator)
Expand Down Expand Up @@ -63,6 +64,7 @@ public function __construct(RegisterQueues $registrator)

/**
* Run the command.
*
* @throws UnregisteredQueueException
* @throws ErrorException
* @throws BowlerGeneralException
Expand All @@ -74,16 +76,19 @@ public function handle()
$queueName = $this->argument('queueName');

// Options
$exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; // If the exchange name has not been set, use the queue name
$exchangeName = ($name = $this->option('exchangeName')) ? $name
: $queueName; // If the exchange name has not been set, use the queue name
$exchangeType = $this->option('exchangeType');
$bindingKeys = (array) $this->option('bindingKeys');
$passive = (bool) $this->option('passive');
$durable = (bool) $this->option('durable');
$autoDelete = (bool) $this->option('autoDelete');

// Dead Lettering
$deadLetterQueueName = ($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : (($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : null);
$deadLetterExchangeName = ($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : (($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : null);
$deadLetterQueueName = ($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName
: (($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : null);
$deadLetterExchangeName = ($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName
: (($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : null);
$deadLetterExchangeType = $this->option('deadLetterExchangeType');
$deadLetterRoutingKey = $this->option('deadLetterRoutingKey');
$messageTTL = ($ttl = $this->option('messageTTL')) ? (int) $ttl : null;
Expand All @@ -94,18 +99,19 @@ public function handle()
foreach ($handlers as $handler) {
if ($handler->queueName == $queueName) {

// If options are set in Registrator:queue(string $queueName,string $Handler, array $options).
if (!empty($handler->options)) {
// Use whatever the user has set/provided, to override our defaults.
extract($handler->options);
}
// If options are set in Registrator:queue(string $queueName,string $Handler, array $options).
if (!empty($handler->options)) {
// Use whatever the user has set/provided, to override our defaults.
extract($handler->options);
}

$bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete);

if ($deadLetterQueueName) {

// If configured as options and deadLetterExchangeName is not specified, default to deadLetterQueueName.
$deadLetterExchangeName = isset($deadLetterExchangeName) ? $deadLetterExchangeName : $deadLetterQueueName;
$deadLetterExchangeName = isset($deadLetterExchangeName) ? $deadLetterExchangeName
: $deadLetterQueueName;

$bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL);
}
Expand All @@ -114,15 +120,15 @@ public function handle()
}
}

throw new UnregisteredQueueException('No registered queue found with name '.$queueName.'.');
throw new UnregisteredQueueException('No registered queue found with name ' . $queueName . '.');
}

public function loadQueuesDefinitions()
{
$path = app_path().'/Messaging/queues.php';
$path = app_path() . '/Messaging/queues.php';

if (!file_exists($path)) {
return $this->error('Queues definitions file not found. Please create it at '.$path);
return $this->error('Queues definitions file not found. Please create it at ' . $path);
}

require $path;
Expand Down
8 changes: 4 additions & 4 deletions src/Console/Commands/ConsumerHealthCheckCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ public function handle()
// find consumer tag within the list of returned consumers
foreach ($response->consumer_details as $consumer) {
if (isset($consumer->consumer_tag) && $consumer->consumer_tag == $tag) {
$this->info('Healthy consumer with tag '.$tag);
$this->info('Healthy consumer with tag ' . $tag);

return 0;
}
}

$this->error('Health check failed! Could not find consumer with tag "'.$tag.'"');
$this->error('Health check failed! Could not find consumer with tag "' . $tag . '"');

return 1;
}

$this->error('No consumers connected to queue "'.$queueName.'"');
$this->error('No consumers connected to queue "' . $queueName . '"');

return 1;
} catch (AMQPProtocolChannelException $e) {
switch ($e->getCode()) {
case 404:
$this->error('Queue with name '.$queueName.' does not exist.');
$this->error('Queue with name ' . $queueName . ' does not exist.');
break;
default:
$this->error('An unknown channel exception occurred.');
Expand Down
8 changes: 4 additions & 4 deletions src/Console/Commands/QueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public function handle()
$handlerGenerator = new HandlerGenerator();

$queue = $this->argument('queueName');
$handler = Str::studly(preg_replace('/Handler(\.php)?$/', '', $this->argument('handler')).'Handler');
$handler = Str::studly(preg_replace('/Handler(\.php)?$/', '', $this->argument('handler')) . 'Handler');

try {
$handlerGenerator->generate($queue, $handler, self::TYPE);

$this->info(
'Queue '.$queue.' added successfully.'.
"\n".
'Handler class '.$handler.' created successfully.'.
'Queue ' . $queue . ' added successfully.' .
"\n" .
'Handler class ' . $handler . ' created successfully.' .
"\n"
);
} catch (Exception $e) {
Expand Down
10 changes: 5 additions & 5 deletions src/Console/Commands/SubscriberCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ public function handle()
$queue = $this->argument('queueName');

if ($this->option('expressive')) {
$queue = $queue.'-pub-sub';
$queue = $queue . '-pub-sub';
}

$handler = Str::studly(preg_replace('/Handler(\.php)?$/', '', $this->argument('handler')).'Handler');
$handler = Str::studly(preg_replace('/Handler(\.php)?$/', '', $this->argument('handler')) . 'Handler');

try {
$handlerGenerator->generate($queue, $handler, self::TYPE);

$this->info(
'Queue '.$queue.' added successfully and bound to the default `pub-sub` exchange.'.
"\n".
'Handler class '.$handler.' created successfully.'.
'Queue ' . $queue . ' added successfully and bound to the default `pub-sub` exchange.' .
"\n" .
'Handler class ' . $handler . ' created successfully.' .
"\n"
);
} catch (Exception $e) {
Expand Down
2 changes: 1 addition & 1 deletion src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Dispatcher extends Producer
* Part of the fair dispatch implementation
* Allow setting the exchange name and type with default of `topic`.
*
* @param Connection $connection
* @param Connection $connection
*/
public function __construct(Connection $connection)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Facades/Registrator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

/**
* @method static void queue(string $queue, string $className, array $options)
* @method static void subscriber(string $queue, string $className, array $bindingKeys, string $exchangeName, string $exchangeType = 'topic')
* @method static void subscriber(string $queue, string $className, array $bindingKeys, string $exchangeName, string $exchangeType = 'topic', array $options = [])
* @method static array getHandlers()
*
* @see \Vinelab\Bowler\RegisterQueues
Expand Down
22 changes: 12 additions & 10 deletions src/Generators/HandlerGenerator.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ private function registerHandler($queue, $handler, $queuePath, $handlerNamespace
{
// Get queue stub content and replace variables with values
$queueContent = file_get_contents($this->getQueueStub());
$queueContent = str_replace(['{{type}}', '{{queue}}', '{{handler}}'], [$type, "'".$queue."'", "'".$handlerNamespace.'\\'.$handler."'"], $queueContent);
$queueContent = str_replace(['{{type}}', '{{queue}}', '{{handler}}'], [$type,
"'" . $queue . "'",
"'" . $handlerNamespace . '\\' . $handler . "'"], $queueContent);

// Remove `<?php` string if file already exist
if (file_exists($queuePath) && !empty(file_get_contents($queuePath))) {
Expand All @@ -54,23 +56,23 @@ private function generateHandler($handler, $handlerPath, $handlerNamespace)
}

// Create Handler
file_put_contents($handlerPath.$handler.'.php', $handlerContent);
file_put_contents($handlerPath . $handler . '.php', $handlerContent);
}

/**
* Find queue absolute path.
*/
private function findQueuePath()
{
return app_path().'/Messaging/queues.php';
return app_path() . '/Messaging/queues.php';
}

/**
* Find handler absolute path.
*/
private function findHandlerPath()
{
return app_path().'/Messaging/Handlers/';
return app_path() . '/Messaging/Handlers/';
}

/**
Expand All @@ -80,23 +82,23 @@ private function findHandlerNamespace()
{
$rootNamespace = $this->findRootNamespace();

return $rootNamespace.'\Messaging\Handlers';
return $rootNamespace . '\Messaging\Handlers';
}

/**
* Find queue stub absolute path.
*/
private function getQueueStub()
{
return __DIR__.'/stubs/queue.stub';
return __DIR__ . '/stubs/queue.stub';
}

/**
* Find handler stub absolute path.
*/
private function getHandlerStub()
{
return __DIR__.'/stubs/handler.stub';
return __DIR__ . '/stubs/handler.stub';
}

/**
Expand All @@ -109,10 +111,10 @@ private function getHandlerStub()
private function findRootNamespace()
{
// read composer.json file contents to determine the namespace
$composer = json_decode(file_get_contents(base_path().'/composer.json'), true);
$composer = json_decode(file_get_contents(base_path() . '/composer.json'), true);
// see which one refers to the "src/" directory
foreach ($composer['autoload']['psr-4'] as $namespace => $directory) {
if ($directory === $this->getSourceDirectoryName().'/') {
if ($directory === $this->getSourceDirectoryName() . '/') {
return trim($namespace, '\\');
}
}
Expand All @@ -128,7 +130,7 @@ private function findRootNamespace()
*/
private function getSourceDirectoryName()
{
if (file_exists(base_path().'/'.$this->srcDirectoryName)) {
if (file_exists(base_path() . '/' . $this->srcDirectoryName)) {
return $this->srcDirectoryName;
}

Expand Down
2 changes: 2 additions & 0 deletions src/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
class Handler
{
public $queueName = '';

public $className = '';

public $options = [];
}
7 changes: 4 additions & 3 deletions src/MessageBroker.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class MessageBroker

/**
* MessageBroker constructor.
*
* @param AMQPMessage $message
* @param string $queueName
* @param string $handlerClass
Expand Down Expand Up @@ -78,8 +79,8 @@ public function ackMessage()
/**
* Negatively acknowledge a message.
*
* @param bool $multiple
* @param bool $requeue
* @param bool $multiple
* @param bool $requeue
*/
public function nackMessage($multiple = false, $requeue = false)
{
Expand All @@ -90,7 +91,7 @@ public function nackMessage($multiple = false, $requeue = false)
/**
* Reject a message.
*
* @param bool $requeue
* @param bool $requeue
*/
public function rejectMessage($requeue = false)
{
Expand Down
Loading

0 comments on commit 34db062

Please sign in to comment.