From 523bef6b8663f1cac65a01db23873a728775b47d Mon Sep 17 00:00:00 2001 From: Abed Halawi Date: Fri, 2 Nov 2018 16:03:00 +0200 Subject: [PATCH 1/4] revamp the health check mechanism Before: we used to check for the number of consumers connected and compare it to an expected number of consumers, if that didn't match the current consumer reports a health problem. This proved to be unreliable as more consumers get connected to the same queue, gradually it starts breaking consumers it shouldn't. Now: The new way of performing health checks consists of checking the current consumer by its tag. When the consumer connects to the queue, RabbitMQ returns the tag generated for this consumer. Once done, Bowler saves this tag to a file in the app's storage path and uses the Management Plugin to fetch the list of connected consumers to the indicated queue and checks for this consumer's tag among the list. --- README.md | 6 +- composer.json | 4 +- src/Connection.php | 58 +++++++++++++++++++ src/Console/Commands/ConsumeCommand.php | 30 +++++++--- .../Commands/ConsumerHealthCheckCommand.php | 38 ++++++++---- src/Consumer.php | 6 +- src/Traits/ConsumerTagTrait.php | 26 +++++++++ .../ConsumerHealthCheckCommandTest.php | 58 +++++++++++++------ 8 files changed, 187 insertions(+), 39 deletions(-) create mode 100644 src/Traits/ConsumerTagTrait.php diff --git a/README.md b/README.md index 5c80dbb..fa6779c 100644 --- a/README.md +++ b/README.md @@ -358,13 +358,13 @@ To do so, the default laravel exception handler normaly located in `app\Exceptio ### Health Checks +**IMPORTANT: Management plugin is required to be installed in order to perform health checks.** + Based on [this Reliability Guide](https://www.rabbitmq.com/reliability.html), Bowler figured that it would be beneficial to provide a tool to check the health of connected consumers and is provided through the `bowler:healthcheck:consumer` command with the following signature: ``` -bowler:healthcheck:consumer - {queueName : The queue name} - {--c|consumers=1 : The expected number of consumers to be connected to the queue specified by queueName} +bowler:healthcheck:consumer {queueName : The queue name} ``` Example: `php artisan bowler:healthcheck:consumer the-queue` diff --git a/composer.json b/composer.json index 8cdeac0..4678f6b 100644 --- a/composer.json +++ b/composer.json @@ -27,7 +27,9 @@ "php": ">=7.0", "php-amqplib/php-amqplib": "v2.6.1", "illuminate/console": "5.*", - "illuminate/support": "5.*" + "illuminate/support": "5.*", + "vinelab/http": "^1.5", + "illuminate/filesystem": "5.x" }, "require-dev": { diff --git a/src/Connection.php b/src/Connection.php index 0774e46..841ccf2 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -5,6 +5,7 @@ define('__ROOT__', dirname(dirname(dirname(__FILE__)))); //require_once(__ROOT__.'/vendor/autoload.php'); +use Vinelab\Http\Client as HTTPClient; use PhpAmqpLib\Connection\AMQPStreamConnection; /** @@ -29,6 +30,34 @@ class Connection */ private $channel; + /** + * RabbitMQ server host. + * + * @var string + */ + private $host; + + /** + * Management plugin's port. + * + * @var int + */ + private $managementPort = 15672; + + /** + * RabbitMQ server username. + * + * @var string + */ + private $username; + + /** + * RabbitMQ server password. + * + * @var string + */ + private $password; + /** * @param string $host the ip of the rabbitmq server, default: localhost * @param int $port. default: 5672 @@ -37,6 +66,11 @@ class Connection */ public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest') { + $this->host = $host; + $this->poart = $port; + $this->username = $username; + $this->password = $password; + $this->connection = new AMQPStreamConnection( $host, $port, @@ -67,6 +101,30 @@ public function getChannel() return $this->channel; } + /** + * Fetch the list of consumers details for the given queue name using the management API. + * + * @param string $queueName + * @param string $columns + * + * @return array + */ + public function fetchQueueConsumers($queueName, string $columns = 'consumer_details.consumer_tag') + { + $http = app(HTTPClient::class); + + $request = [ + 'url' => $this->host.':'.$this->managementPort.'/api/queues/%2F/'.$queueName, + 'params' => ['columns' => $columns], + 'auth' => [ + 'username' => $this->username, + 'password' => $this->password, + ], + ]; + + return $http->get($request)->json(); + } + public function __destruct() { $this->channel->close(); diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 0e2547d..73aa0a7 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -2,12 +2,13 @@ namespace Vinelab\Bowler\Console\Commands; -use Vinelab\Bowler\Consumer; -use Vinelab\Bowler\Connection; use Illuminate\Console\Command; -use Vinelab\Bowler\Facades\Registrator; -use Vinelab\Bowler\Exceptions\UnregisteredQueueException; +use Vinelab\Bowler\Connection; +use Vinelab\Bowler\Consumer; use Vinelab\Bowler\Exceptions\Handler as BowlerExceptionHandler; +use Vinelab\Bowler\Exceptions\UnregisteredQueueException; +use Vinelab\Bowler\Facades\Registrator; +use Vinelab\Bowler\RegisterQueues; /** * @author Ali Issa @@ -17,9 +18,11 @@ class ConsumeCommand extends Command { protected $registerQueues; - public function __construct() + public function __construct(RegisterQueues $registrator) { parent::__construct(); + + $this->registrator = $registrator; } /** @@ -70,8 +73,8 @@ public function handle() $deadLetterRoutingKey = $this->option('deadLetterRoutingKey'); $messageTTL = ($ttl = $this->option('messageTTL')) ? (int) $ttl : null; - require app_path().'/Messaging/queues.php'; - $handlers = Registrator::getHandlers(); + $this->loadQueuesDefinitions(); + $handlers = $this->registrator->getHandlers(); foreach ($handlers as $handler) { if ($handler->queueName == $queueName) { @@ -83,6 +86,7 @@ public function handle() } $bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete); + if ($deadLetterQueueName) { // If configured as options and deadLetterExchangeName is not specified, default to deadLetterQueueName. @@ -90,10 +94,22 @@ public function handle() $bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL); } + $bowlerConsumer->listenToQueue($handler->className, app(BowlerExceptionHandler::class)); } } throw new UnregisteredQueueException('No registered queue found with name '.$queueName.'.'); } + + public function loadQueuesDefinitions() + { + $path = app_path().'/Messaging/queues.php'; + + if (!file_exists($path)) { + return $this->error('Queues definitions file not found. Please create it at '.$path); + } + + require $path; + } } diff --git a/src/Console/Commands/ConsumerHealthCheckCommand.php b/src/Console/Commands/ConsumerHealthCheckCommand.php index 52aee09..2959a77 100644 --- a/src/Console/Commands/ConsumerHealthCheckCommand.php +++ b/src/Console/Commands/ConsumerHealthCheckCommand.php @@ -3,30 +3,32 @@ namespace Vinelab\Bowler\Console\Commands; use ErrorException; -use Vinelab\Bowler\Connection; use Illuminate\Console\Command; use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use Vinelab\Bowler\Connection; +use Vinelab\Bowler\Traits\ConsumerTagTrait; /** * @author Abed Halawi */ class ConsumerHealthCheckCommand extends Command { + use ConsumerTagTrait; + /** * The console command name. * * @var string */ protected $signature = 'bowler:healthcheck:consumer - {queueName : The queue name} - {--c|consumers=1 : The expected number of consumers to be connected to the queue specified by queueName}'; + {queueName : The queue name}'; /** * The console command description. * * @var string */ - protected $description = 'Check the health of connected consumers to a queue, with a minimum of 1 connection.'; + protected $description = 'Check the health of connected consumers to a given queue.'; /** * Run the command. @@ -34,13 +36,13 @@ class ConsumerHealthCheckCommand extends Command public function handle() { $queueName = $this->argument('queueName'); - $expectedConsumers = (int) $this->option('consumers'); // may or may not be able to connect try { $connection = app(Connection::class); } catch (ErrorException $e) { $this->error('Unable to connect to RabbitMQ.'); + return 1; } @@ -56,15 +58,31 @@ public function handle() [] ); - // consumer count and minimum consumers connected should match - if ($consumerCount !== $expectedConsumers) { - $this->error('Health check failed. Minimum consumer count not met: expected '.$expectedConsumers.' got '.$consumerCount); + $response = $connection->fetchQueueConsumers($queueName); + + if ($response && isset($response->consumer_details) && !empty($response->consumer_details)) { + // read consumer tag + $tag = $this->readConsumerTag(); + + // find consumer tag within the list of returned consumers + foreach ($response->consumer_details as $consumer) { + if (isset($consumer->consumer_tag) && $consumer->consumer_tag == $tag) { + $this->info('Healthy consumer with tag '.$tag); + + return 0; + } + } + + $this->error('Health check failed! Could not find consumer with tag "'.$tag.'"'); + return 1; } - $this->info('Consumers healthy with '.$consumerCount.' live connections.'); + $this->error('No consumers connected to queue "'.$queueName.'"'); + + return 1; } catch (AMQPProtocolChannelException $e) { - switch($e->getCode()) { + switch ($e->getCode()) { case 404: $this->error('Queue with name '.$queueName.' does not exist.'); break; diff --git a/src/Consumer.php b/src/Consumer.php index 6c0bebc..2d90a6d 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use Vinelab\Bowler\Traits\AdminTrait; +use Vinelab\Bowler\Traits\ConsumerTagTrait; use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Traits\CompileParametersTrait; use Vinelab\Bowler\Exceptions\Handler as BowlerExceptionHandler; @@ -16,6 +17,7 @@ class Consumer { use AdminTrait; + use ConsumerTagTrait; use DeadLetteringTrait; use CompileParametersTrait; @@ -156,7 +158,9 @@ public function listenToQueue($handlerClass, BowlerExceptionHandler $exceptionHa }; $channel->basic_qos(null, 1, null); - $channel->basic_consume($this->queueName, '', false, false, false, false, $callback); + $tag = $channel->basic_consume($this->queueName, '', false, false, false, false, $callback); + + $this->writeConsumerTag($tag); echo ' [*] Listening to Queue: ', $this->queueName, ' To exit press CTRL+C', "\n"; diff --git a/src/Traits/ConsumerTagTrait.php b/src/Traits/ConsumerTagTrait.php new file mode 100644 index 0000000..64b57f7 --- /dev/null +++ b/src/Traits/ConsumerTagTrait.php @@ -0,0 +1,26 @@ + + */ +trait ConsumerTagTrait +{ + public function getConsumerTagFilePath() + { + return storage_path().'/app/rabbitmq-consumer.tag'; + } + + private function writeConsumerTag($tag) + { + Storage::disk('local')->put($this->getConsumerTagFilePath(), $tag); + } + + public function readConsumerTag() + { + return Storage::disk('local')->get($this->getConsumerTagFilePath()); + } +} diff --git a/tests/Console/Commands/ConsumerHealthCheckCommandTest.php b/tests/Console/Commands/ConsumerHealthCheckCommandTest.php index ebb347e..adf8228 100644 --- a/tests/Console/Commands/ConsumerHealthCheckCommandTest.php +++ b/tests/Console/Commands/ConsumerHealthCheckCommandTest.php @@ -3,12 +3,14 @@ namespace Vinelab\Bowler\Tests\Console\Commands; use Mockery as M; -use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; use Vinelab\Bowler\Connection; -use Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand; use Vinelab\Bowler\Tests\TestCase; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +/** + * @author Abed Halawi + */ class ConsumerHealthCheckCommandTest extends TestCase { public function tearDown() @@ -18,19 +20,28 @@ public function tearDown() public function test_checking_consumer_successfully() { - $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error]'); - - $queueName = 'queue-to-consume'; + $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error,readConsumerTag]'); + $queueName = 'the-queue'; + $consumerTag = 'tag-1234'; $this->app['Illuminate\Contracts\Console\Kernel']->registerCommand($command); $mConnection = M::mock(Connection::class); + + $command->shouldReceive('readConsumerTag')->once()->andReturn($consumerTag); + $mConnection->shouldReceive('fetchQueueConsumers')->once()->andReturn(json_decode(json_encode([ + 'consumer_details' => [ + ['consumer_tag' => $consumerTag], + ['consumer_tag' => 'another-tag-here'], + ], + ]))); + $mChannel = M::mock(AMQPChannel::class); $mChannel::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $mChannel->shouldReceive('queue_declare')->once()->andReturn([$queueName, 10, 1]); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); @@ -39,52 +50,65 @@ public function test_checking_consumer_successfully() $this->assertEquals(0, $code); } - public function test_with_0_expected_1_connected() + public function test_with_no_consumers_connected() { // should err out requesting minimum to be greater than 0 $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error]'); - $command->shouldReceive('error')->once()->with('Health check failed. Minimum consumer count not met: expected 0 got 1'); + $command->shouldReceive('error')->once()->with('No consumers connected to queue "queue-to-consume"'); $queueName = 'queue-to-consume'; $this->app['Illuminate\Contracts\Console\Kernel']->registerCommand($command); $mConnection = M::mock(Connection::class); + $mConnection->shouldReceive('fetchQueueConsumers')->once()->andReturn(json_decode(json_encode([]))); + $mChannel = M::mock(AMQPChannel::class); $mChannel::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $mChannel->shouldReceive('queue_declare')->once()->andReturn([$queueName, 10, 1]); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); - $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName, '--consumers' => 0]); + $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName]); $this->assertEquals(1, $code); } - public function test_with_1_expected_0_connected() + public function test_with_consumer_tag_not_found() { // should err out requesting minimum to be greater than 0 - $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error]'); - $command->shouldReceive('error')->once()->with('Health check failed. Minimum consumer count not met: expected 1 got 0'); + $command = M::mock('Vinelab\Bowler\Console\Commands\ConsumerHealthCheckCommand[error,readConsumerTag]'); $queueName = 'queue-to-consume'; + $consumerTag = 'amqp.98oyiuahksjdf'; + + $command->shouldReceive('error')->once()->with('Health check failed! Could not find consumer with tag "'.$consumerTag.'"'); $this->app['Illuminate\Contracts\Console\Kernel']->registerCommand($command); $mConnection = M::mock(Connection::class); + + $command->shouldReceive('readConsumerTag')->once()->andReturn($consumerTag); + $mConnection->shouldReceive('fetchQueueConsumers')->once()->andReturn(json_decode(json_encode([ + 'consumer_details' => [ + ['consumer_tag' => 'nope-not-me'], + ['consumer_tag' => 'another-tag-here'], + ], + ]))); + $mChannel = M::mock(AMQPChannel::class); $mChannel::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $mChannel->shouldReceive('queue_declare')->once()->andReturn([$queueName, 10, 0]); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); - $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName, '--consumers' => 1]); + $code = $this->artisan('bowler:healthcheck:consumer', ['queueName' => $queueName]); $this->assertEquals(1, $code); } @@ -106,7 +130,7 @@ public function test_healthcheck_with_queue_does_not_exist() ->andThrow($exception); $mConnection->shouldReceive('getChannel')->once()->andReturn($mChannel); - $this->app->bind(Connection::class, function() use($mConnection) { + $this->app->bind(Connection::class, function () use ($mConnection) { return $mConnection; }); From e06cc2fcd72134ad7035304e94646af4ef0a0a32 Mon Sep 17 00:00:00 2001 From: Abed Halawi Date: Tue, 6 Nov 2018 19:53:18 +0200 Subject: [PATCH 2/4] add Connection::fetchQueueConsumers test with default params --- src/Connection.php | 6 ++--- tests/ConnectionTest.php | 49 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 tests/ConnectionTest.php diff --git a/src/Connection.php b/src/Connection.php index 841ccf2..6309102 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -35,7 +35,7 @@ class Connection * * @var string */ - private $host; + private $host = 'localhost'; /** * Management plugin's port. @@ -49,14 +49,14 @@ class Connection * * @var string */ - private $username; + private $username = 'guest'; /** * RabbitMQ server password. * * @var string */ - private $password; + private $password = 'guest'; /** * @param string $host the ip of the rabbitmq server, default: localhost diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php new file mode 100644 index 0000000..098519a --- /dev/null +++ b/tests/ConnectionTest.php @@ -0,0 +1,49 @@ + + */ +class ConnectionTest extends TestCase +{ + public function tearDown() + { + M::close(); + } + + public function test_fetching_consumers_default() + { + $queueName = 'the-queue'; + $mClient = M::mock(HTTPClient::class); + $request = [ + 'url' => 'localhost:15672/api/queues/%2F/'.$queueName, + 'params' => ['columns' => 'consumer_details.consumer_tag'], + 'auth' => [ + 'username' => 'guest', + 'password' => 'guest', + ], + ]; + + $mClient->shouldReceive('get')->once()->with($request)->andReturn($mClient); + $mClient->shouldReceive('json')->once()->withNoArgs()->andReturn('response'); + + $this->app->bind(HTTPClient::class, function () use ($mClient) { + return $mClient; + }); + + $mConnection = M::mock(Connection::class)->makePartial(); + $this->app->bind(Connection::class, function () use ($mConnection) { + return $mConnection; + }); + + $connection = $this->app[Connection::class]; + $response = $connection->fetchQueueConsumers($queueName); + + $this->assertEquals('response', $response); + } +} From fc7b7b69ea229767104c74abe8b3b95dfad2b824 Mon Sep 17 00:00:00 2001 From: Abed Halawi Date: Tue, 6 Nov 2018 20:34:06 +0200 Subject: [PATCH 3/4] use the same registrator instance all over the app (as a singleton) --- src/BowlerServiceProvider.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/BowlerServiceProvider.php b/src/BowlerServiceProvider.php index 647b10b..79de40e 100644 --- a/src/BowlerServiceProvider.php +++ b/src/BowlerServiceProvider.php @@ -19,10 +19,16 @@ class BowlerServiceProvider extends ServiceProvider */ public function register() { + // register facade to resolve instance $this->app->singleton('vinelab.bowler.registrator', function ($app) { return new RegisterQueues(); }); + // use the same Registrator instance all over the app (to make it injectable). + $this->app->singleton(RegisterQueues::class, function ($app) { + return $app['vinelab.bowler.registrator']; + }); + // Bind connection to env configuration $rbmqHost = config('queue.connections.rabbitmq.host'); $rbmqPort = config('queue.connections.rabbitmq.port'); From 3a4291f254eb636d25d860110715204c28cc8239 Mon Sep 17 00:00:00 2001 From: Abed Halawi Date: Tue, 6 Nov 2018 20:57:34 +0200 Subject: [PATCH 4/4] change file location no need to specify the full path when using `Storage::disk('local')`, it will automatically pick up the storage's app path. --- src/Traits/ConsumerTagTrait.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Traits/ConsumerTagTrait.php b/src/Traits/ConsumerTagTrait.php index 64b57f7..4f0e63e 100644 --- a/src/Traits/ConsumerTagTrait.php +++ b/src/Traits/ConsumerTagTrait.php @@ -9,18 +9,18 @@ */ trait ConsumerTagTrait { - public function getConsumerTagFilePath() + public function getConsumerTagFilename() { - return storage_path().'/app/rabbitmq-consumer.tag'; + return 'rabbitmq-consumer.tag'; } private function writeConsumerTag($tag) { - Storage::disk('local')->put($this->getConsumerTagFilePath(), $tag); + Storage::disk('local')->put($this->getConsumerTagFilename(), $tag); } public function readConsumerTag() { - return Storage::disk('local')->get($this->getConsumerTagFilePath()); + return Storage::disk('local')->get($this->getConsumerTagFilename()); } }