Skip to content

Commit

Permalink
Merge pull request #33 from Vinelab/improve/healthcheck
Browse files Browse the repository at this point in the history
Revamp the health check mechanism
  • Loading branch information
Mulkave authored Nov 7, 2018
2 parents 467be7e + 3a4291f commit d148f55
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 39 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,13 @@ To do so, the default laravel exception handler normaly located in `app\Exceptio

### Health Checks

**IMPORTANT: Management plugin is required to be installed in order to perform health checks.**

Based on [this Reliability Guide](https://www.rabbitmq.com/reliability.html), Bowler figured that it would be beneficial to provide
a tool to check the health of connected consumers and is provided through the `bowler:healthcheck:consumer` command with the following signature:

```
bowler:healthcheck:consumer
{queueName : The queue name}
{--c|consumers=1 : The expected number of consumers to be connected to the queue specified by queueName}
bowler:healthcheck:consumer {queueName : The queue name}
```

Example: `php artisan bowler:healthcheck:consumer the-queue`
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
"php": ">=7.0",
"php-amqplib/php-amqplib": "v2.6.1",
"illuminate/console": "5.*",
"illuminate/support": "5.*"
"illuminate/support": "5.*",
"vinelab/http": "^1.5",
"illuminate/filesystem": "5.x"
},

"require-dev": {
Expand Down
6 changes: 6 additions & 0 deletions src/BowlerServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ class BowlerServiceProvider extends ServiceProvider
*/
public function register()
{
// register facade to resolve instance
$this->app->singleton('vinelab.bowler.registrator', function ($app) {
return new RegisterQueues();
});

// use the same Registrator instance all over the app (to make it injectable).
$this->app->singleton(RegisterQueues::class, function ($app) {
return $app['vinelab.bowler.registrator'];
});

// Bind connection to env configuration
$rbmqHost = config('queue.connections.rabbitmq.host');
$rbmqPort = config('queue.connections.rabbitmq.port');
Expand Down
58 changes: 58 additions & 0 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
define('__ROOT__', dirname(dirname(dirname(__FILE__))));
//require_once(__ROOT__.'/vendor/autoload.php');

use Vinelab\Http\Client as HTTPClient;
use PhpAmqpLib\Connection\AMQPStreamConnection;

/**
Expand All @@ -29,6 +30,34 @@ class Connection
*/
private $channel;

/**
* RabbitMQ server host.
*
* @var string
*/
private $host = 'localhost';

/**
* Management plugin's port.
*
* @var int
*/
private $managementPort = 15672;

/**
* RabbitMQ server username.
*
* @var string
*/
private $username = 'guest';

/**
* RabbitMQ server password.
*
* @var string
*/
private $password = 'guest';

/**
* @param string $host the ip of the rabbitmq server, default: localhost
* @param int $port. default: 5672
Expand All @@ -37,6 +66,11 @@ class Connection
*/
public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest')
{
$this->host = $host;
$this->poart = $port;
$this->username = $username;
$this->password = $password;

$this->connection = new AMQPStreamConnection(
$host,
$port,
Expand Down Expand Up @@ -67,6 +101,30 @@ public function getChannel()
return $this->channel;
}

/**
* Fetch the list of consumers details for the given queue name using the management API.
*
* @param string $queueName
* @param string $columns
*
* @return array
*/
public function fetchQueueConsumers($queueName, string $columns = 'consumer_details.consumer_tag')
{
$http = app(HTTPClient::class);

$request = [
'url' => $this->host.':'.$this->managementPort.'/api/queues/%2F/'.$queueName,
'params' => ['columns' => $columns],
'auth' => [
'username' => $this->username,
'password' => $this->password,
],
];

return $http->get($request)->json();
}

public function __destruct()
{
$this->channel->close();
Expand Down
30 changes: 23 additions & 7 deletions src/Console/Commands/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

namespace Vinelab\Bowler\Console\Commands;

use Vinelab\Bowler\Consumer;
use Vinelab\Bowler\Connection;
use Illuminate\Console\Command;
use Vinelab\Bowler\Facades\Registrator;
use Vinelab\Bowler\Exceptions\UnregisteredQueueException;
use Vinelab\Bowler\Connection;
use Vinelab\Bowler\Consumer;
use Vinelab\Bowler\Exceptions\Handler as BowlerExceptionHandler;
use Vinelab\Bowler\Exceptions\UnregisteredQueueException;
use Vinelab\Bowler\Facades\Registrator;
use Vinelab\Bowler\RegisterQueues;

/**
* @author Ali Issa <[email protected]>
Expand All @@ -17,9 +18,11 @@ class ConsumeCommand extends Command
{
protected $registerQueues;

public function __construct()
public function __construct(RegisterQueues $registrator)
{
parent::__construct();

$this->registrator = $registrator;
}

/**
Expand Down Expand Up @@ -70,8 +73,8 @@ public function handle()
$deadLetterRoutingKey = $this->option('deadLetterRoutingKey');
$messageTTL = ($ttl = $this->option('messageTTL')) ? (int) $ttl : null;

require app_path().'/Messaging/queues.php';
$handlers = Registrator::getHandlers();
$this->loadQueuesDefinitions();
$handlers = $this->registrator->getHandlers();

foreach ($handlers as $handler) {
if ($handler->queueName == $queueName) {
Expand All @@ -83,17 +86,30 @@ public function handle()
}

$bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete);

if ($deadLetterQueueName) {

// If configured as options and deadLetterExchangeName is not specified, default to deadLetterQueueName.
$deadLetterExchangeName = isset($deadLetterExchangeName) ? $deadLetterExchangeName : $deadLetterQueueName;

$bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL);
}

$bowlerConsumer->listenToQueue($handler->className, app(BowlerExceptionHandler::class));
}
}

throw new UnregisteredQueueException('No registered queue found with name '.$queueName.'.');
}

public function loadQueuesDefinitions()
{
$path = app_path().'/Messaging/queues.php';

if (!file_exists($path)) {
return $this->error('Queues definitions file not found. Please create it at '.$path);
}

require $path;
}
}
38 changes: 28 additions & 10 deletions src/Console/Commands/ConsumerHealthCheckCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,46 @@
namespace Vinelab\Bowler\Console\Commands;

use ErrorException;
use Vinelab\Bowler\Connection;
use Illuminate\Console\Command;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use Vinelab\Bowler\Connection;
use Vinelab\Bowler\Traits\ConsumerTagTrait;

/**
* @author Abed Halawi <[email protected]>
*/
class ConsumerHealthCheckCommand extends Command
{
use ConsumerTagTrait;

/**
* The console command name.
*
* @var string
*/
protected $signature = 'bowler:healthcheck:consumer
{queueName : The queue name}
{--c|consumers=1 : The expected number of consumers to be connected to the queue specified by queueName}';
{queueName : The queue name}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Check the health of connected consumers to a queue, with a minimum of 1 connection.';
protected $description = 'Check the health of connected consumers to a given queue.';

/**
* Run the command.
*/
public function handle()
{
$queueName = $this->argument('queueName');
$expectedConsumers = (int) $this->option('consumers');

// may or may not be able to connect
try {
$connection = app(Connection::class);
} catch (ErrorException $e) {
$this->error('Unable to connect to RabbitMQ.');

return 1;
}

Expand All @@ -56,15 +58,31 @@ public function handle()
[]
);

// consumer count and minimum consumers connected should match
if ($consumerCount !== $expectedConsumers) {
$this->error('Health check failed. Minimum consumer count not met: expected '.$expectedConsumers.' got '.$consumerCount);
$response = $connection->fetchQueueConsumers($queueName);

if ($response && isset($response->consumer_details) && !empty($response->consumer_details)) {
// read consumer tag
$tag = $this->readConsumerTag();

// find consumer tag within the list of returned consumers
foreach ($response->consumer_details as $consumer) {
if (isset($consumer->consumer_tag) && $consumer->consumer_tag == $tag) {
$this->info('Healthy consumer with tag '.$tag);

return 0;
}
}

$this->error('Health check failed! Could not find consumer with tag "'.$tag.'"');

return 1;
}

$this->info('Consumers healthy with '.$consumerCount.' live connections.');
$this->error('No consumers connected to queue "'.$queueName.'"');

return 1;
} catch (AMQPProtocolChannelException $e) {
switch($e->getCode()) {
switch ($e->getCode()) {
case 404:
$this->error('Queue with name '.$queueName.' does not exist.');
break;
Expand Down
6 changes: 5 additions & 1 deletion src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Vinelab\Bowler;

use Vinelab\Bowler\Traits\AdminTrait;
use Vinelab\Bowler\Traits\ConsumerTagTrait;
use Vinelab\Bowler\Traits\DeadLetteringTrait;
use Vinelab\Bowler\Traits\CompileParametersTrait;
use Vinelab\Bowler\Exceptions\Handler as BowlerExceptionHandler;
Expand All @@ -16,6 +17,7 @@
class Consumer
{
use AdminTrait;
use ConsumerTagTrait;
use DeadLetteringTrait;
use CompileParametersTrait;

Expand Down Expand Up @@ -156,7 +158,9 @@ public function listenToQueue($handlerClass, BowlerExceptionHandler $exceptionHa
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
$tag = $channel->basic_consume($this->queueName, '', false, false, false, false, $callback);

$this->writeConsumerTag($tag);

echo ' [*] Listening to Queue: ', $this->queueName, ' To exit press CTRL+C', "\n";

Expand Down
26 changes: 26 additions & 0 deletions src/Traits/ConsumerTagTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace Vinelab\Bowler\Traits;

use Storage;

/**
* @author Abed Halawi <[email protected]>
*/
trait ConsumerTagTrait
{
public function getConsumerTagFilename()
{
return 'rabbitmq-consumer.tag';
}

private function writeConsumerTag($tag)
{
Storage::disk('local')->put($this->getConsumerTagFilename(), $tag);
}

public function readConsumerTag()
{
return Storage::disk('local')->get($this->getConsumerTagFilename());
}
}
49 changes: 49 additions & 0 deletions tests/ConnectionTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Vinelab\Bowler\Tests;

use Mockery as M;
use Vinelab\Bowler\Connection;
use Vinelab\Http\Client as HTTPClient;

/**
* @author Abed Halawi <[email protected]>
*/
class ConnectionTest extends TestCase
{
public function tearDown()
{
M::close();
}

public function test_fetching_consumers_default()
{
$queueName = 'the-queue';
$mClient = M::mock(HTTPClient::class);
$request = [
'url' => 'localhost:15672/api/queues/%2F/'.$queueName,
'params' => ['columns' => 'consumer_details.consumer_tag'],
'auth' => [
'username' => 'guest',
'password' => 'guest',
],
];

$mClient->shouldReceive('get')->once()->with($request)->andReturn($mClient);
$mClient->shouldReceive('json')->once()->withNoArgs()->andReturn('response');

$this->app->bind(HTTPClient::class, function () use ($mClient) {
return $mClient;
});

$mConnection = M::mock(Connection::class)->makePartial();
$this->app->bind(Connection::class, function () use ($mConnection) {
return $mConnection;
});

$connection = $this->app[Connection::class];
$response = $connection->fetchQueueConsumers($queueName);

$this->assertEquals('response', $response);
}
}
Loading

0 comments on commit d148f55

Please sign in to comment.