diff --git a/README.md b/README.md index 5c80dbb..fa6779c 100644 --- a/README.md +++ b/README.md @@ -358,13 +358,13 @@ To do so, the default laravel exception handler normaly located in `app\Exceptio ### Health Checks +**IMPORTANT: Management plugin is required to be installed in order to perform health checks.** + Based on [this Reliability Guide](https://www.rabbitmq.com/reliability.html), Bowler figured that it would be beneficial to provide a tool to check the health of connected consumers and is provided through the `bowler:healthcheck:consumer` command with the following signature: ``` -bowler:healthcheck:consumer - {queueName : The queue name} - {--c|consumers=1 : The expected number of consumers to be connected to the queue specified by queueName} +bowler:healthcheck:consumer {queueName : The queue name} ``` Example: `php artisan bowler:healthcheck:consumer the-queue` diff --git a/composer.json b/composer.json index 8cdeac0..4678f6b 100644 --- a/composer.json +++ b/composer.json @@ -27,7 +27,9 @@ "php": ">=7.0", "php-amqplib/php-amqplib": "v2.6.1", "illuminate/console": "5.*", - "illuminate/support": "5.*" + "illuminate/support": "5.*", + "vinelab/http": "^1.5", + "illuminate/filesystem": "5.x" }, "require-dev": { diff --git a/src/BowlerServiceProvider.php b/src/BowlerServiceProvider.php index 647b10b..79de40e 100644 --- a/src/BowlerServiceProvider.php +++ b/src/BowlerServiceProvider.php @@ -19,10 +19,16 @@ class BowlerServiceProvider extends ServiceProvider */ public function register() { + // register facade to resolve instance $this->app->singleton('vinelab.bowler.registrator', function ($app) { return new RegisterQueues(); }); + // use the same Registrator instance all over the app (to make it injectable). + $this->app->singleton(RegisterQueues::class, function ($app) { + return $app['vinelab.bowler.registrator']; + }); + // Bind connection to env configuration $rbmqHost = config('queue.connections.rabbitmq.host'); $rbmqPort = config('queue.connections.rabbitmq.port'); diff --git a/src/Connection.php b/src/Connection.php index 0774e46..6309102 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -5,6 +5,7 @@ define('__ROOT__', dirname(dirname(dirname(__FILE__)))); //require_once(__ROOT__.'/vendor/autoload.php'); +use Vinelab\Http\Client as HTTPClient; use PhpAmqpLib\Connection\AMQPStreamConnection; /** @@ -29,6 +30,34 @@ class Connection */ private $channel; + /** + * RabbitMQ server host. + * + * @var string + */ + private $host = 'localhost'; + + /** + * Management plugin's port. + * + * @var int + */ + private $managementPort = 15672; + + /** + * RabbitMQ server username. + * + * @var string + */ + private $username = 'guest'; + + /** + * RabbitMQ server password. + * + * @var string + */ + private $password = 'guest'; + /** * @param string $host the ip of the rabbitmq server, default: localhost * @param int $port. default: 5672 @@ -37,6 +66,11 @@ class Connection */ public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest') { + $this->host = $host; + $this->poart = $port; + $this->username = $username; + $this->password = $password; + $this->connection = new AMQPStreamConnection( $host, $port, @@ -67,6 +101,30 @@ public function getChannel() return $this->channel; } + /** + * Fetch the list of consumers details for the given queue name using the management API. + * + * @param string $queueName + * @param string $columns + * + * @return array + */ + public function fetchQueueConsumers($queueName, string $columns = 'consumer_details.consumer_tag') + { + $http = app(HTTPClient::class); + + $request = [ + 'url' => $this->host.':'.$this->managementPort.'/api/queues/%2F/'.$queueName, + 'params' => ['columns' => $columns], + 'auth' => [ + 'username' => $this->username, + 'password' => $this->password, + ], + ]; + + return $http->get($request)->json(); + } + public function __destruct() { $this->channel->close(); diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 0e2547d..73aa0a7 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -2,12 +2,13 @@ namespace Vinelab\Bowler\Console\Commands; -use Vinelab\Bowler\Consumer; -use Vinelab\Bowler\Connection; use Illuminate\Console\Command; -use Vinelab\Bowler\Facades\Registrator; -use Vinelab\Bowler\Exceptions\UnregisteredQueueException; +use Vinelab\Bowler\Connection; +use Vinelab\Bowler\Consumer; use Vinelab\Bowler\Exceptions\Handler as BowlerExceptionHandler; +use Vinelab\Bowler\Exceptions\UnregisteredQueueException; +use Vinelab\Bowler\Facades\Registrator; +use Vinelab\Bowler\RegisterQueues; /** * @author Ali Issa @@ -17,9 +18,11 @@ class ConsumeCommand extends Command { protected $registerQueues; - public function __construct() + public function __construct(RegisterQueues $registrator) { parent::__construct(); + + $this->registrator = $registrator; } /** @@ -70,8 +73,8 @@ public function handle() $deadLetterRoutingKey = $this->option('deadLetterRoutingKey'); $messageTTL = ($ttl = $this->option('messageTTL')) ? (int) $ttl : null; - require app_path().'/Messaging/queues.php'; - $handlers = Registrator::getHandlers(); + $this->loadQueuesDefinitions(); + $handlers = $this->registrator->getHandlers(); foreach ($handlers as $handler) { if ($handler->queueName == $queueName) { @@ -83,6 +86,7 @@ public function handle() } $bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete); + if ($deadLetterQueueName) { // If configured as options and deadLetterExchangeName is not specified, default to deadLetterQueueName. @@ -90,10 +94,22 @@ public function handle() $bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL); } + $bowlerConsumer->listenToQueue($handler->className, app(BowlerExceptionHandler::class)); } } throw new UnregisteredQueueException('No registered queue found with name '.$queueName.'.'); } + + public function loadQueuesDefinitions() + { + $path = app_path().'/Messaging/queues.php'; + + if (!file_exists($path)) { + return $this->error('Queues definitions file not found. Please create it at '.$path); + } + + require $path; + } } diff --git a/src/Console/Commands/ConsumerHealthCheckCommand.php b/src/Console/Commands/ConsumerHealthCheckCommand.php index 52aee09..2959a77 100644 --- a/src/Console/Commands/ConsumerHealthCheckCommand.php +++ b/src/Console/Commands/ConsumerHealthCheckCommand.php @@ -3,30 +3,32 @@ namespace Vinelab\Bowler\Console\Commands; use ErrorException; -use Vinelab\Bowler\Connection; use Illuminate\Console\Command; use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use Vinelab\Bowler\Connection; +use Vinelab\Bowler\Traits\ConsumerTagTrait; /** * @author Abed Halawi */ class ConsumerHealthCheckCommand extends Command { + use ConsumerTagTrait; + /** * The console command name. * * @var string */ protected $signature = 'bowler:healthcheck:consumer - {queueName : The queue name} - {--c|consumers=1 : The expected number of consumers to be connected to the queue specified by queueName}'; + {queueName : The queue name}'; /** * The console command description. * * @var string */ - protected $description = 'Check the health of connected consumers to a queue, with a minimum of 1 connection.'; + protected $description = 'Check the health of connected consumers to a given queue.'; /** * Run the command. @@ -34,13 +36,13 @@ class ConsumerHealthCheckCommand extends Command public function handle() { $queueName = $this->argument('queueName'); - $expectedConsumers = (int) $this->option('consumers'); // may or may not be able to connect try { $connection = app(Connection::class); } catch (ErrorException $e) { $this->error('Unable to connect to RabbitMQ.'); + return 1; } @@ -56,15 +58,31 @@ public function handle() [] ); - // consumer count and minimum consumers connected should match - if ($consumerCount !== $expectedConsumers) { - $this->error('Health check failed. Minimum consumer count not met: expected '.$expectedConsumers.' got '.$consumerCount); + $response = $connection->fetchQueueConsumers($queueName); + + if ($response && isset($response->consumer_details) && !empty($response->consumer_details)) { + // read consumer tag + $tag = $this->readConsumerTag(); + + // find consumer tag within the list of returned consumers + foreach ($response->consumer_details as $consumer) { + if (isset($consumer->consumer_tag) && $consumer->consumer_tag == $tag) { + $this->info('Healthy consumer with tag '.$tag); + + return 0; + } + } + + $this->error('Health check failed! Could not find consumer with tag "'.$tag.'"'); + return 1; } - $this->info('Consumers healthy with '.$consumerCount.' live connections.'); + $this->error('No consumers connected to queue "'.$queueName.'"'); + + return 1; } catch (AMQPProtocolChannelException $e) { - switch($e->getCode()) { + switch ($e->getCode()) { case 404: $this->error('Queue with name '.$queueName.' does not exist.'); break; diff --git a/src/Consumer.php b/src/Consumer.php index 6c0bebc..2d90a6d 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use Vinelab\Bowler\Traits\AdminTrait; +use Vinelab\Bowler\Traits\ConsumerTagTrait; use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Traits\CompileParametersTrait; use Vinelab\Bowler\Exceptions\Handler as BowlerExceptionHandler; @@ -16,6 +17,7 @@ class Consumer { use AdminTrait; + use ConsumerTagTrait; use DeadLetteringTrait; use CompileParametersTrait; @@ -156,7 +158,9 @@ public function listenToQueue($handlerClass, BowlerExceptionHandler $exceptionHa }; $channel->basic_qos(null, 1, null); - $channel->basic_consume($this->queueName, '', false, false, false, false, $callback); + $tag = $channel->basic_consume($this->queueName, '', false, false, false, false, $callback); + + $this->writeConsumerTag($tag); echo ' [*] Listening to Queue: ', $this->queueName, ' To exit press CTRL+C', "\n"; diff --git a/src/Traits/ConsumerTagTrait.php b/src/Traits/ConsumerTagTrait.php new file mode 100644 index 0000000..4f0e63e --- /dev/null +++ b/src/Traits/ConsumerTagTrait.php @@ -0,0 +1,26 @@ + + */ +trait ConsumerTagTrait +{ + public function getConsumerTagFilename() + { + return 'rabbitmq-consumer.tag'; + } + + private function writeConsumerTag($tag) + { + Storage::disk('local')->put($this->getConsumerTagFilename(), $tag); + } + + public function readConsumerTag() + { + return Storage::disk('local')->get($this->getConsumerTagFilename()); + } +} diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php new file mode 100644 index 0000000..098519a --- /dev/null +++ b/tests/ConnectionTest.php @@ -0,0 +1,49 @@ + + */ +class ConnectionTest extends TestCase +{ + public function tearDown() + { + M::close(); + } + + public function test_fetching_consumers_default() + { + $queueName = 'the-queue'; + $mClient = M::mock(HTTPClient::class); + $request = [ + 'url' => 'localhost:15672/api/queues/%2F/'.$queueName, + 'params' => ['columns' => 'consumer_details.consumer_tag'], + 'auth' => [ + 'username' => 'guest', + 'password' => 'guest', + ], + ]; + + $mClient->shouldReceive('get')->once()->with($request)->andReturn($mClient); + $mClient->shouldReceive('json')->once()->withNoArgs()->andReturn('response'); + + $this->app->bind(HTTPClient::class, function () use ($mClient) { + return $mClient; + }); + + $mConnection = M::mock(Connection::class)->makePartial(); + $this->app->bind(Connection::class, function () use ($mConnection) { + return $mConnection; + }); + + $connection = $this->app[Connection::class]; + $response = $connection->fetchQueueConsumers($queueName); + + $this->assertEquals('response', $response); + } +} diff --git a/tests/Console/Commands/ConsumerHealthCheckCommandTest.php b/tests/Console/Commands/ConsumerHealthCheckCommandTest.php index ebb347e..adf8228 100644 --- a/tests/Console/Commands/ConsumerHealthCheckCommandTest.php +++ b/tests/Console/Commands/ConsumerHealthCheckCommandTest.php @@ -3,12 +3,14 @@ namespace Vinelab\Bowler\Tests\Console\Commands; use Mockery as M; -use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; use Vinelab\Bowler\Connection; -use Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand; use Vinelab\Bowler\Tests\TestCase; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +/** + * @author Abed Halawi + */ class ConsumerHealthCheckCommandTest extends TestCase { public function tearDown() @@ -18,19 +20,28 @@ public function tearDown() public function test_checking_consumer_successfully() { - $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error]'); - - $queueName = 'queue-to-consume'; + $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error,readConsumerTag]'); + $queueName = 'the-queue'; + $consumerTag = 'tag-1234'; $this->app['Illuminate\Contracts\Console\Kernel']->registerCommand($command); $mConnection = M::mock(Connection::class); + + $command->shouldReceive('readConsumerTag')->once()->andReturn($consumerTag); + $mConnection->shouldReceive('fetchQueueConsumers')->once()->andReturn(json_decode(json_encode([ + 'consumer_details' => [ + ['consumer_tag' => $consumerTag], + ['consumer_tag' => 'another-tag-here'], + ], + ]))); + $mChannel = M::mock(AMQPChannel::class); $mChannel::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $mChannel->shouldReceive('queue_declare')->once()->andReturn([$queueName, 10, 1]); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); @@ -39,52 +50,65 @@ public function test_checking_consumer_successfully() $this->assertEquals(0, $code); } - public function test_with_0_expected_1_connected() + public function test_with_no_consumers_connected() { // should err out requesting minimum to be greater than 0 $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error]'); - $command->shouldReceive('error')->once()->with('Health check failed. Minimum consumer count not met: expected 0 got 1'); + $command->shouldReceive('error')->once()->with('No consumers connected to queue "queue-to-consume"'); $queueName = 'queue-to-consume'; $this->app['Illuminate\Contracts\Console\Kernel']->registerCommand($command); $mConnection = M::mock(Connection::class); + $mConnection->shouldReceive('fetchQueueConsumers')->once()->andReturn(json_decode(json_encode([]))); + $mChannel = M::mock(AMQPChannel::class); $mChannel::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $mChannel->shouldReceive('queue_declare')->once()->andReturn([$queueName, 10, 1]); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); - $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName, '--consumers' => 0]); + $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName]); $this->assertEquals(1, $code); } - public function test_with_1_expected_0_connected() + public function test_with_consumer_tag_not_found() { // should err out requesting minimum to be greater than 0 - $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error]'); - $command->shouldReceive('error')->once()->with('Health check failed. Minimum consumer count not met: expected 1 got 0'); + $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error,readConsumerTag]'); $queueName = 'queue-to-consume'; + $consumerTag = 'amqp.98oyiuahksjdf'; + + $command->shouldReceive('error')->once()->with('Health check failed! Could not find consumer with tag "'.$consumerTag.'"'); $this->app['Illuminate\Contracts\Console\Kernel']->registerCommand($command); $mConnection = M::mock(Connection::class); + + $command->shouldReceive('readConsumerTag')->once()->andReturn($consumerTag); + $mConnection->shouldReceive('fetchQueueConsumers')->once()->andReturn(json_decode(json_encode([ + 'consumer_details' => [ + ['consumer_tag' => 'nope-not-me'], + ['consumer_tag' => 'another-tag-here'], + ], + ]))); + $mChannel = M::mock(AMQPChannel::class); $mChannel::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $mChannel->shouldReceive('queue_declare')->once()->andReturn([$queueName, 10, 0]); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); - $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName, '--consumers' => 1]); + $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName]); $this->assertEquals(1, $code); } @@ -106,7 +130,7 @@ public function test_healthcheck_with_queue_does_not_exist() ->andThrow($exception); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; });