From 04404f9fc8447a555aee70582abb75c812a69eb3 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 15:41:56 +0200 Subject: [PATCH 01/22] updated doc comments for method --- RabbitMq/Consumer.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/RabbitMq/Consumer.php b/RabbitMq/Consumer.php index 423e3e81..a704494c 100644 --- a/RabbitMq/Consumer.php +++ b/RabbitMq/Consumer.php @@ -38,7 +38,11 @@ public function getMemoryLimit() /** * Consume the message * - * @param int $msgAmount + * @param int $msgAmount + * + * @return int + * + * @throws AMQPTimeoutException */ public function consume($msgAmount) { From 27b01a424ba45a918301d5e9577f15b59a2198e8 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 15:42:30 +0200 Subject: [PATCH 02/22] added schema onfiguration for batch_consumer --- DependencyInjection/Configuration.php | 37 +++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 02d6e411..b5fe3183 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -33,6 +33,7 @@ public function getConfigTreeBuilder() $this->addConsumers($rootNode); $this->addMultipleConsumers($rootNode); $this->addDynamicConsumers($rootNode); + $this->addBatchConsumers($rootNode); $this->addAnonConsumers($rootNode); $this->addRpcClients($rootNode); $this->addRpcServers($rootNode); @@ -220,6 +221,42 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node) ; } + /** + * @param ArrayNodeDefinition $node + * + * @return void + */ + protected function addBatchConsumers(ArrayNodeDefinition $node) + { + $node + ->children() + ->arrayNode('batch_consumers') + ->canBeUnset() + ->useAttributeAsKey('key') + ->prototype('array') + ->append($this->getExchangeConfiguration()) + ->children() + ->scalarNode('connection')->defaultValue('default')->end() + ->scalarNode('callback')->isRequired()->end() + ->scalarNode('idle_timeout')->end() + ->scalarNode('timeout_wait')->defaultValue(3)->end() + ->scalarNode('idle_timeout_exit_code')->end() + ->scalarNode('auto_setup_fabric')->defaultTrue()->end() + ->arrayNode('qos_options') + ->children() + ->scalarNode('prefetch_size')->defaultValue(0)->end() + ->scalarNode('prefetch_count')->defaultValue(2)->end() + ->booleanNode('global')->defaultFalse()->end() + ->end() + ->end() + ->scalarNode('enable_logger')->defaultFalse()->end() + ->end() + ->end() + ->end() + ->end() + ; + } + protected function addAnonConsumers(ArrayNodeDefinition $node) { $node From 39951b107f14d63848ffe4664308936edfcfcf10 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 15:45:26 +0200 Subject: [PATCH 03/22] send the configuration name from the extension --- DependencyInjection/Configuration.php | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index b5fe3183..9c8153e3 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -13,11 +13,26 @@ */ class Configuration implements ConfigurationInterface { + /** + * @var string + */ + protected $name; + + /** + * Configuration constructor. + * + * @param string $name + */ + public function __construct($name) + { + $this->name = $name; + } + public function getConfigTreeBuilder() { $tree = new TreeBuilder(); - $rootNode = $tree->root('old_sound_rabbit_mq'); + $rootNode = $tree->root($this->name); $rootNode ->children() From c5b332e41ffc6eeac989de8046af4b1c95737043 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 15:45:49 +0200 Subject: [PATCH 04/22] set the configuration name & load batch consumers build --- .../OldSoundRabbitMqExtension.php | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index 53ae6736..6305b1bc 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -41,7 +41,7 @@ public function load(array $configs, ContainerBuilder $container) $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); $loader->load('rabbitmq.xml'); - $configuration = new Configuration(); + $configuration = new Configuration($this->getAlias()); $this->config = $this->processConfiguration($configuration, $configs); $this->collectorEnabled = $this->config['enable_collector']; @@ -52,6 +52,7 @@ public function load(array $configs, ContainerBuilder $container) $this->loadConsumers(); $this->loadMultipleConsumers(); $this->loadDynamicConsumers(); + $this->loadBatchConsumers(); $this->loadAnonConsumers(); $this->loadRpcClients(); $this->loadRpcServers(); @@ -348,6 +349,47 @@ protected function loadDynamicConsumers() } } + protected function loadBatchConsumers() + { + foreach ($this->config['batch_consumers'] as $key => $consumer) { + $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); + $definition + ->addTag('old_sound_rabbit_mq.base_amqp') + ->addTag('old_sound_rabbit_mq.consumer') + ->addTag('old_sound_rabbit_mq.batch_consumer') + ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) + ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) + ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) + ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))) + ->addMethodCall('setQosOptions', array( + $consumer['qos_options']['prefetch_size'], + $consumer['qos_options']['prefetch_count'], + $consumer['qos_options']['global'] + )) + ; + + if (isset($consumer['idle_timeout'])) { + $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); + } + + if (!$consumer['auto_setup_fabric']) { + $definition->addMethodCall('disableAutoSetupFabric'); + } + + $this->injectConnection($definition, $consumer['connection']); + if ($this->collectorEnabled) { + $this->injectLoggedChannel($definition, $key, $consumer['connection']); + } + + if ($consumer['enable_logger']) { + $this->injectLogger($definition); + } + + $name = sprintf('old_sound_rabbit_mq.%s_batch', $key); + $this->container->setDefinition($name, $definition); + } + } + protected function loadAnonConsumers() { foreach ($this->config['anon_consumers'] as $key => $anon) { From 2c173a762f66debad75c7c5f07ecbe7ed66bbd48 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 15:46:09 +0200 Subject: [PATCH 05/22] added batch_consumer class definition --- Resources/config/rabbitmq.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/Resources/config/rabbitmq.xml b/Resources/config/rabbitmq.xml index b6ed55d5..64f9c28a 100644 --- a/Resources/config/rabbitmq.xml +++ b/Resources/config/rabbitmq.xml @@ -14,6 +14,7 @@ OldSound\RabbitMqBundle\RabbitMq\Consumer OldSound\RabbitMqBundle\RabbitMq\MultipleConsumer OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer + OldSound\RabbitMqBundle\RabbitMq\BatchConsumer OldSound\RabbitMqBundle\RabbitMq\AnonConsumer OldSound\RabbitMqBundle\RabbitMq\RpcClient OldSound\RabbitMqBundle\RabbitMq\RpcServer From 1d59ce98b2a1645b83ba38574625cdd791f11ee5 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 15:46:37 +0200 Subject: [PATCH 06/22] initial batch consumer class implementation --- RabbitMq/BatchConsumer.php | 104 +++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 RabbitMq/BatchConsumer.php diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php new file mode 100644 index 00000000..79859ddd --- /dev/null +++ b/RabbitMq/BatchConsumer.php @@ -0,0 +1,104 @@ +target = $msgAmount; + + $this->setupConsumer(); + + $isConsuming = false; + $timeoutWanted = $this->getTimeoutWait(); + while (count($this->getChannel()->callbacks)) { + $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this)); + $this->maybeStopConsumer(); + if (!$this->forceStop) { + try { + $this->consumeMessage($timeoutWanted); + $isConsuming = true; + } catch (AMQPTimeoutException $e) { + if ($isConsuming) { + $isConsuming = false; + } elseif (null !== $this->getIdleTimeoutExitCode()) { + return $this->getIdleTimeoutExitCode(); + } else { + throw $e; + } + } + } + + $timeoutWanted = ($isConsuming) ? $this->getTimeoutWait() : $this->getIdleTimeout(); + } + } + + /** + * @param int $timeout + * + * @return $this + */ + public function setTimeoutWait($timeout) + { + $this->timeoutWait = $timeout; + + return $this; + } + + /** + * @param int $amount + * + * @return $this + */ + public function setPrefetchCount($amount) + { + $this->prefetchCount = $amount; + + return $this; + } + + /** + * @return int + */ + public function getTimeoutWait() + { + return $this->timeoutWait; + } + + /** + * @return int + */ + public function getPrefetchCount() + { + return $this->prefetchCount; + } + + /** + * @param int $timeout + * + * @return void + * + * @throws AMQPTimeoutException + */ + private function consumeMessage($timeout) + { + $this->getChannel()->wait(null, false, $timeout); + } +} \ No newline at end of file From 80392e1b6a71dd1a92d3b214e8d170a668a4afa0 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 18:26:53 +0200 Subject: [PATCH 07/22] complete the build of batch consumers from extensions --- DependencyInjection/OldSoundRabbitMqExtension.php | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index 6305b1bc..d341ff29 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -353,13 +353,19 @@ protected function loadBatchConsumers() { foreach ($this->config['batch_consumers'] as $key => $consumer) { $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); + + if (!isset($consumer['exchange_options'])) { + $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); + } + $definition ->addTag('old_sound_rabbit_mq.base_amqp') - ->addTag('old_sound_rabbit_mq.consumer') ->addTag('old_sound_rabbit_mq.batch_consumer') ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) + ->addMethodCall('setBatchCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) + ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))) ->addMethodCall('setQosOptions', array( $consumer['qos_options']['prefetch_size'], @@ -385,8 +391,7 @@ protected function loadBatchConsumers() $this->injectLogger($definition); } - $name = sprintf('old_sound_rabbit_mq.%s_batch', $key); - $this->container->setDefinition($name, $definition); + $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); } } From e2565589692233bc55f8533a0075fd4a8a490052 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 18:27:42 +0200 Subject: [PATCH 08/22] added queue configuration key for configuration --- DependencyInjection/Configuration.php | 1 + 1 file changed, 1 insertion(+) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 9c8153e3..79080041 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -250,6 +250,7 @@ protected function addBatchConsumers(ArrayNodeDefinition $node) ->useAttributeAsKey('key') ->prototype('array') ->append($this->getExchangeConfiguration()) + ->append($this->getQueueConfiguration()) ->children() ->scalarNode('connection')->defaultValue('default')->end() ->scalarNode('callback')->isRequired()->end() From 16d2ec801b2320829c5db6593205920d51bc8b01 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 18:28:24 +0200 Subject: [PATCH 09/22] keep track of messages and fix implementation for isCompleteBatch --- RabbitMq/BatchConsumer.php | 135 +++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 79859ddd..121c1480 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -4,6 +4,7 @@ use OldSound\RabbitMqBundle\Event\OnConsumeEvent; use PhpAmqpLib\Exception\AMQPTimeoutException; +use PhpAmqpLib\Message\AMQPMessage; class BatchConsumer extends Consumer { @@ -17,6 +18,21 @@ class BatchConsumer extends Consumer */ protected $timeoutWait; + /** + * @var array + */ + protected $messages = array(); + + /** + * @var int + */ + protected $batchCounter = 0; + + /** + * @var \Closure + */ + protected $batchCallback; + /** * @inheritDoc */ @@ -36,6 +52,7 @@ public function consume($msgAmount) $this->consumeMessage($timeoutWanted); $isConsuming = true; } catch (AMQPTimeoutException $e) { + $this->batchConsume(); if ($isConsuming) { $isConsuming = false; } elseif (null !== $this->getIdleTimeoutExitCode()) { @@ -46,10 +63,128 @@ public function consume($msgAmount) } } + if ($this->isCompleteBatch($isConsuming)) { + $this->batchConsume(); + } + $timeoutWanted = ($isConsuming) ? $this->getTimeoutWait() : $this->getIdleTimeout(); } } + /** + * @return void + * + * @throws \Exception + */ + protected function batchConsume() + { + if ($this->batchCounter == 0) { + return; + } + + try { + call_user_func($this->batchCallback); + $this->resetBatch(); + } catch (\Exception $exception) { + $this->resetBatch(true); + throw $exception; + } + } + + /** + * @param bool $isConsuming + * + * @return bool + */ + protected function isCompleteBatch($isConsuming) + { + return $isConsuming && $this->batchCounter != 0 && $this->batchCounter%$this->prefetchCount == 0; + } + + /** + * @inheritDoc + */ + public function stopConsuming() + { + $this->batchConsume(); + + parent::stopConsuming(); + } + + /** + * @inheritDoc + */ + protected function handleProcessMessage(AMQPMessage $msg, $processFlag) + { + $isRejectedOrReQueued = false; + + if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { + // Reject and requeue message to RabbitMQ + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true); + $isRejectedOrReQueued = true; + } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) { + // NACK and requeue message to RabbitMQ + $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true); + $isRejectedOrReQueued = true; + } else if ($processFlag === ConsumerInterface::MSG_REJECT) { + // Reject and drop + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); + } + + $this->consumed++; + $this->maybeStopConsumer(); + if (!$isRejectedOrReQueued) { + $this->addDeliveryTag($msg); + } + + if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { + $this->stopConsuming(); + } + } + + /** + * @param bool $hasExceptions + * + * @return void + */ + private function resetBatch($hasExceptions = false) + { + if ($hasExceptions) { + array_map(function(AMQPMessage $msg) { + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true); + }, $this->messages); + } else { + array_map(function(AMQPMessage $msg) { + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + }, $this->messages); + } + + $this->messages = array(); + $this->batchCounter = 0; + } + + /** + * @param AMQPMessage $message + * + * @return void + */ + private function addDeliveryTag(AMQPMessage $message) + { + $this->messages[$this->batchCounter++] = $message; + } + + /** + * @param \Closure $callback + * + * @return $this + */ + public function setBatchCallback($callback) + { + $this->batchCallback = $callback; + + return $this; + } + /** * @param int $timeout * From 1cf94dc15b58ac14282759e13fe4f766c3edc3fa Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 18:28:53 +0200 Subject: [PATCH 10/22] created separate command for batch consumers --- Command/BatchConsumerCommand.php | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 Command/BatchConsumerCommand.php diff --git a/Command/BatchConsumerCommand.php b/Command/BatchConsumerCommand.php new file mode 100644 index 00000000..3eb0af86 --- /dev/null +++ b/Command/BatchConsumerCommand.php @@ -0,0 +1,20 @@ +setName('rabbitmq:batch-consumer') + ->setDescription('Executes a batch consumer') + ; + } + + protected function getConsumerService() + { + return 'old_sound_rabbit_mq.%s_batch'; + } +} \ No newline at end of file From 1381d1742866d900422864b75d1d45508c34d1b4 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Dec 2016 18:29:13 +0200 Subject: [PATCH 11/22] created new interface to signal that a callback will have batch processing --- RabbitMq/BatchConsumerInterface.php | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 RabbitMq/BatchConsumerInterface.php diff --git a/RabbitMq/BatchConsumerInterface.php b/RabbitMq/BatchConsumerInterface.php new file mode 100644 index 00000000..3a9417dc --- /dev/null +++ b/RabbitMq/BatchConsumerInterface.php @@ -0,0 +1,8 @@ + Date: Wed, 7 Dec 2016 18:36:10 +0200 Subject: [PATCH 12/22] keep only channel & delivery tag on message --- RabbitMq/BatchConsumer.php | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 121c1480..11e62085 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -134,7 +134,7 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) $this->consumed++; $this->maybeStopConsumer(); if (!$isRejectedOrReQueued) { - $this->addDeliveryTag($msg); + $this->addMessage($msg); } if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { @@ -150,12 +150,12 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) private function resetBatch($hasExceptions = false) { if ($hasExceptions) { - array_map(function(AMQPMessage $msg) { - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true); + array_map(function($message) { + $message['channel']->basic_reject($message['tag'], true); }, $this->messages); } else { - array_map(function(AMQPMessage $msg) { - $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + array_map(function($message) { + $message['channel']->basic_ack($message['tag']); }, $this->messages); } @@ -168,9 +168,12 @@ private function resetBatch($hasExceptions = false) * * @return void */ - private function addDeliveryTag(AMQPMessage $message) + private function addMessage(AMQPMessage $message) { - $this->messages[$this->batchCounter++] = $message; + $this->messages[$this->batchCounter++] = array( + 'channel' => $message->delivery_info['channel'], + 'tag' => $message->delivery_info['delivery_tag'], + ); } /** From 8ee3028b31316ad7458173d7047ca70a66937343 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Sat, 10 Dec 2016 15:36:43 +0200 Subject: [PATCH 13/22] refactored BatchConsumerCommand it can not accept --messages flag because it does the work in batches, could be added later on maybe, but with other meaning (how many batches not how many messages) --- Command/BatchConsumerCommand.php | 99 +++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 3 deletions(-) diff --git a/Command/BatchConsumerCommand.php b/Command/BatchConsumerCommand.php index 3eb0af86..0476aaaf 100644 --- a/Command/BatchConsumerCommand.php +++ b/Command/BatchConsumerCommand.php @@ -2,17 +2,110 @@ namespace OldSound\RabbitMqBundle\Command; -class BatchConsumerCommand extends BaseConsumerCommand +use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer; +use PhpAmqpLib\Exception\AMQPTimeoutException; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + +class BatchConsumerCommand extends BaseRabbitMqCommand { + /** + * @var BatchConsumer + */ + protected $consumer; + + /** + * @return void + */ + public function stopConsumer() + { + if ($this->consumer instanceof BatchConsumer) { + // Process current message, then halt consumer + $this->consumer->forceStopConsumer(); + + // Halt consumer if waiting for a new message from the queue + try { + $this->consumer->stopConsuming(); + } catch (AMQPTimeoutException $e) {} + } + } + + public function restartConsumer() + { + // TODO: Implement restarting of consumer + } + + protected function configure() { parent::configure(); + $this - ->setName('rabbitmq:batch-consumer') - ->setDescription('Executes a batch consumer') + ->setName('rabbitmq:batch:consumer') + ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') + ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') + ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) + ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') + ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals') + ->setDescription('Executes a Batch Consumer'); ; } + /** + * Executes the current command. + * + * @param InputInterface $input An InputInterface instance + * @param OutputInterface $output An OutputInterface instance + * + * @return integer 0 if everything went fine, or an error code + * + * @throws \InvalidArgumentException When the number of messages to consume is less than 0 + * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + if (defined('AMQP_WITHOUT_SIGNALS') === false) { + define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); + } + + if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) { + if (!function_exists('pcntl_signal')) { + throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called."); + } + + pcntl_signal(SIGTERM, array(&$this, 'stopConsumer')); + pcntl_signal(SIGINT, array(&$this, 'stopConsumer')); +// pcntl_signal(SIGHUP, array(&$this, 'restartConsumer')); + } + + if (defined('AMQP_DEBUG') === false) { + define('AMQP_DEBUG', (bool) $input->getOption('debug')); + } + + $this->initConsumer($input); + + return $this->consumer->consume(); + } + + /** + * @param InputInterface $input + */ + protected function initConsumer(InputInterface $input) + { + $this->consumer = $this->getContainer() + ->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); + + if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) { + $this->consumer->setMemoryLimit($input->getOption('memory-limit')); + } + $this->consumer->setRoutingKey($input->getOption('route')); + } + + /** + * @return string + */ protected function getConsumerService() { return 'old_sound_rabbit_mq.%s_batch'; From 55217073bb1790f9eb5494c2b758d563a81c8e97 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Sat, 10 Dec 2016 15:38:22 +0200 Subject: [PATCH 14/22] refactored the meaning of a batch consumer it now accepts an array of messages and it can return for each message the response ot ack/nack/requeue/reject or 1 single response applied for all. --- RabbitMq/BatchConsumerInterface.php | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/RabbitMq/BatchConsumerInterface.php b/RabbitMq/BatchConsumerInterface.php index 3a9417dc..0b9ec585 100644 --- a/RabbitMq/BatchConsumerInterface.php +++ b/RabbitMq/BatchConsumerInterface.php @@ -2,7 +2,14 @@ namespace OldSound\RabbitMqBundle\RabbitMq; -interface BatchConsumerInterface extends ConsumerInterface +use PhpAmqpLib\Message\AMQPMessage; + +interface BatchConsumerInterface { - public function batchExecute(); -} \ No newline at end of file + /** + * @param AMQPMessage[] $messages + * + * @return array|bool + */ + public function batchExecute(array $messages); +} From bbbac117a7827ebc3282e1baffbdf76f4f5fe4a9 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Sat, 10 Dec 2016 15:38:55 +0200 Subject: [PATCH 15/22] prevent CR comment about new line. --- Command/BatchConsumerCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Command/BatchConsumerCommand.php b/Command/BatchConsumerCommand.php index 0476aaaf..68c65f10 100644 --- a/Command/BatchConsumerCommand.php +++ b/Command/BatchConsumerCommand.php @@ -110,4 +110,4 @@ protected function getConsumerService() { return 'old_sound_rabbit_mq.%s_batch'; } -} \ No newline at end of file +} From c10372141e89fce8dc5f124a34d111ed9a29c836 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Sat, 10 Dec 2016 15:40:56 +0200 Subject: [PATCH 16/22] batch consumer will have just 1 callback that behaves like a regular consumer just has an array of messages instead of a single one --- DependencyInjection/OldSoundRabbitMqExtension.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index d341ff29..38258edc 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -363,10 +363,9 @@ protected function loadBatchConsumers() ->addTag('old_sound_rabbit_mq.batch_consumer') ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) - ->addMethodCall('setBatchCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) + ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) - ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))) ->addMethodCall('setQosOptions', array( $consumer['qos_options']['prefetch_size'], $consumer['qos_options']['prefetch_count'], From db49a24c73aed7e81ce2d9032ebb4b529148f521 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Sat, 10 Dec 2016 15:44:22 +0200 Subject: [PATCH 17/22] refactored BatchConsumer implementation to collect all the messages and then pass it to the callback --- RabbitMq/BatchConsumer.php | 481 +++++++++++++++++++++++++++++++------ 1 file changed, 407 insertions(+), 74 deletions(-) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 11e62085..4260ca44 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -2,12 +2,43 @@ namespace OldSound\RabbitMqBundle\RabbitMq; -use OldSound\RabbitMqBundle\Event\OnConsumeEvent; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; -class BatchConsumer extends Consumer +class BatchConsumer extends BaseAmqp implements DequeuerInterface { + /** + * @var int + */ + protected $consumed = 0; + + /** + * @var \Closure|callable + */ + protected $callback; + + /** + * @var bool + */ + protected $forceStop = false; + + /** + * @var int + */ + protected $idleTimeout = 0; + + /** + * @var int + */ + protected $idleTimeoutExitCode; + + /** + * @var int + */ + protected $memoryLimit = null; + /** * @var int */ @@ -16,7 +47,7 @@ class BatchConsumer extends Consumer /** * @var int */ - protected $timeoutWait; + protected $timeoutWait = 3; /** * @var array @@ -29,27 +60,48 @@ class BatchConsumer extends Consumer protected $batchCounter = 0; /** - * @var \Closure + * @param \Closure|callable $callback + * + * @return $this */ - protected $batchCallback; + public function setCallback($callback) + { + $this->callback = $callback; - /** - * @inheritDoc - */ - public function consume($msgAmount) + return $this; + } + + public function start() + { + $this->setupConsumer(); + + while (count($this->getChannel()->callbacks)) { + $this->getChannel()->wait(); + } + } + + public function execute(AMQPMessage $msg) { - $this->target = $msgAmount; + $this->addMessage($msg); + + $this->maybeStopConsumer(); + + if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { + $this->stopConsuming(); + } + } + public function consume() + { $this->setupConsumer(); $isConsuming = false; $timeoutWanted = $this->getTimeoutWait(); while (count($this->getChannel()->callbacks)) { - $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this)); $this->maybeStopConsumer(); if (!$this->forceStop) { try { - $this->consumeMessage($timeoutWanted); + $this->getChannel()->wait(null, false, $timeoutWanted); $isConsuming = true; } catch (AMQPTimeoutException $e) { $this->batchConsume(); @@ -61,6 +113,8 @@ public function consume($msgAmount) throw $e; } } + } else { + $this->batchConsume(); } if ($this->isCompleteBatch($isConsuming)) { @@ -71,94 +125,184 @@ public function consume($msgAmount) } } - /** - * @return void - * - * @throws \Exception - */ - protected function batchConsume() + public function batchConsume() { if ($this->batchCounter == 0) { return; } - try { - call_user_func($this->batchCallback); + try { + $processFlags = call_user_func($this->callback, $this->messages); + $this->handleProcessMessages($processFlags); + $this->logger->debug('Queue message processed', array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'messages' => $this->messages, + 'return_codes' => $processFlags + ) + )); + } catch (Exception\StopConsumerException $e) { + $this->logger->info('Consumer requested restart', array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'message' => $this->messages, + 'stacktrace' => $e->getTraceAsString() + ) + )); + $this->stopConsuming(); + } catch (\Exception $e) { + $this->logger->error($e->getMessage(), array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'message' => $this->messages, + 'stacktrace' => $e->getTraceAsString() + ) + )); + throw $e; + } catch (\Error $e) { + $this->logger->error($e->getMessage(), array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'message' => $this->messages, + 'stacktrace' => $e->getTraceAsString() + ) + )); + throw $e; + } finally { $this->resetBatch(); - } catch (\Exception $exception) { - $this->resetBatch(true); - throw $exception; } } /** - * @param bool $isConsuming + * @param mixed $processFlags * - * @return bool + * @return void */ - protected function isCompleteBatch($isConsuming) + protected function handleProcessMessages($processFlags = null) { - return $isConsuming && $this->batchCounter != 0 && $this->batchCounter%$this->prefetchCount == 0; - } + $processFlags = $this->analyzeProcessFlags($processFlags); + foreach ($processFlags as $deliveryTag => $processFlag) { + $this->handleProcessFlag($deliveryTag, $processFlag); + } - /** - * @inheritDoc - */ - public function stopConsuming() - { - $this->batchConsume(); + $this->consumed++; + $this->maybeStopConsumer(); - parent::stopConsuming(); + if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { + $this->stopConsuming(); + } } /** - * @inheritDoc + * @param int $deliveryTag + * @param mixed $processFlag + * + * @return void */ - protected function handleProcessMessage(AMQPMessage $msg, $processFlag) + private function handleProcessFlag ($deliveryTag, $processFlag) { - $isRejectedOrReQueued = false; - if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true); - $isRejectedOrReQueued = true; + $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true); } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) { // NACK and requeue message to RabbitMQ - $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true); - $isRejectedOrReQueued = true; + $this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true); } else if ($processFlag === ConsumerInterface::MSG_REJECT) { // Reject and drop - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); + $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false); + } else { + // Remove message from queue only if callback return not false + $this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag); } + } - $this->consumed++; - $this->maybeStopConsumer(); - if (!$isRejectedOrReQueued) { - $this->addMessage($msg); - } + /** + * @param bool $isConsuming + * + * @return bool + */ + protected function isCompleteBatch($isConsuming) + { + return $isConsuming && $this->batchCounter === $this->prefetchCount; + } - if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { + /** + * @param AMQPMessage $msg + * + * @return void + * + * @throws \Error + * @throws \Exception + */ + public function processMessage(AMQPMessage $msg) + { + try { + call_user_func(array($this, 'execute'), $msg); + } catch (Exception\StopConsumerException $e) { + $this->logger->info('Consumer requested restart', array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'message' => $msg, + 'stacktrace' => $e->getTraceAsString() + ) + )); $this->stopConsuming(); + } catch (\Exception $e) { + $this->logger->error($e->getMessage(), array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'message' => $msg, + 'stacktrace' => $e->getTraceAsString() + ) + )); + $this->batchConsume(); + + throw $e; + } catch (\Error $e) { + $this->logger->error($e->getMessage(), array( + 'amqp' => array( + 'queue' => $this->queueOptions['name'], + 'message' => $msg, + 'stacktrace' => $e->getTraceAsString() + ) + )); + $this->batchConsume(); + + throw $e; } } /** - * @param bool $hasExceptions + * @param mixed $processFlags * - * @return void + * @return array */ - private function resetBatch($hasExceptions = false) + private function analyzeProcessFlags($processFlags = null) { - if ($hasExceptions) { - array_map(function($message) { - $message['channel']->basic_reject($message['tag'], true); - }, $this->messages); - } else { - array_map(function($message) { - $message['channel']->basic_ack($message['tag']); - }, $this->messages); + if (is_array($processFlags)) { + if (count($processFlags) !== $this->batchCounter) { + throw new AMQPRuntimeException( + 'Method batchExecute() should return an array with elements equal with the number of messages processed' + ); + } + + return $processFlags; } + $response = array(); + foreach ($this->messages as $deliveryTag => $message) { + $response[$deliveryTag] = $processFlags; + } + + return $response; + } + + + /** + * @return void + */ + private function resetBatch() + { $this->messages = array(); $this->batchCounter = 0; } @@ -170,24 +314,205 @@ private function resetBatch($hasExceptions = false) */ private function addMessage(AMQPMessage $message) { - $this->messages[$this->batchCounter++] = array( - 'channel' => $message->delivery_info['channel'], - 'tag' => $message->delivery_info['delivery_tag'], - ); + $this->batchCounter++; + $this->messages[(int)$message->delivery_info['delivery_tag']] = $message; + } + + /** + * @param int $deliveryTag + * + * @return AMQPMessage + */ + public function getMessage($deliveryTag) + { + return isset($this->messages[$deliveryTag]) + ? $this->messages[$deliveryTag] + : null + ; + } + + /** + * @param int $deliveryTag + * + * @return AMQPChannel + * + * @throws AMQPRuntimeException + */ + public function getMessageChannel($deliveryTag) + { + $message = $this->getMessage($deliveryTag); + if (!$message) { + throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag)); + } + + return $message->delivery_info['channel']; + } + + /** + * @return void + */ + public function stopConsuming() + { + $this->batchConsume(); + + $this->getChannel()->basic_cancel($this->getConsumerTag()); + } + + /** + * @return void + */ + protected function setupConsumer() + { + if ($this->autoSetupFabric) { + $this->setupFabric(); + } + + $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage')); + } + + /** + * @return void + * + * @throws \BadFunctionCallException + */ + protected function maybeStopConsumer() + { + if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) { + if (!function_exists('pcntl_signal_dispatch')) { + throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called."); + } + + pcntl_signal_dispatch(); + } + + if ($this->forceStop) { + $this->stopConsuming(); + } else { + return; + } } /** - * @param \Closure $callback + * @param string $tag * * @return $this */ - public function setBatchCallback($callback) + public function setConsumerTag($tag) { - $this->batchCallback = $callback; + $this->consumerTag = $tag; return $this; } + /** + * @return string + */ + public function getConsumerTag() + { + return $this->consumerTag; + } + + /** + * @return void + */ + public function forceStopConsumer() + { + $this->forceStop = true; + } + + /** + * Sets the qos settings for the current channel + * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0 + * + * @param int $prefetchSize + * @param int $prefetchCount + * @param bool $global + */ + public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false) + { + $this->prefetchCount = $prefetchCount; + $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); + } + + /** + * @param int $idleTimeout + * + * @return $this + */ + public function setIdleTimeout($idleTimeout) + { + $this->idleTimeout = $idleTimeout; + + return $this; + } + + /** + * Set exit code to be returned when there is a timeout exception + * + * @param int $idleTimeoutExitCode + * + * @return $this + */ + public function setIdleTimeoutExitCode($idleTimeoutExitCode) + { + $this->idleTimeoutExitCode = $idleTimeoutExitCode; + + return $this; + } + + /** + * Purge the queue + */ + public function purge() + { + $this->getChannel()->queue_purge($this->queueOptions['name'], true); + } + + /** + * Delete the queue + */ + public function delete() + { + $this->getChannel()->queue_delete($this->queueOptions['name'], true); + } + + /** + * Checks if memory in use is greater or equal than memory allowed for this process + * + * @return boolean + */ + protected function isRamAlmostOverloaded() + { + return (memory_get_usage(true) >= ($this->getMemoryLimit() * 1048576)); + } + + /** + * @return int + */ + public function getIdleTimeout() + { + return $this->idleTimeout; + } + + /** + * Get exit code to be returned when there is a timeout exception + * + * @return int|null + */ + public function getIdleTimeoutExitCode() + { + return $this->idleTimeoutExitCode; + } + + /** + * Resets the consumed property. + * Use when you want to call start() or consume() multiple times. + */ + public function resetConsumed() + { + $this->consumed = 0; + } + /** * @param int $timeout * @@ -229,14 +554,22 @@ public function getPrefetchCount() } /** - * @param int $timeout + * Set the memory limit * - * @return void + * @param int $memoryLimit + */ + public function setMemoryLimit($memoryLimit) + { + $this->memoryLimit = $memoryLimit; + } + + /** + * Get the memory limit * - * @throws AMQPTimeoutException + * @return int */ - private function consumeMessage($timeout) + public function getMemoryLimit() { - $this->getChannel()->wait(null, false, $timeout); + return $this->memoryLimit; } -} \ No newline at end of file +} From 599b6756b3434a45284d662633ef0c6856046462 Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Tue, 6 Jun 2017 19:05:54 +0300 Subject: [PATCH 18/22] [README] updated readme for batch consumer --- README.md | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/README.md b/README.md index efca92b4..23a7bef4 100644 --- a/README.md +++ b/README.md @@ -755,6 +755,100 @@ $ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher The only new option compared to the commands that we have seen before is the one that specifies the __routing key__: `-r '#.error'`. +### Batch Consumers ### + +In some cases you will want to get a batch of messages and then do some processing on all of them. Batch consumers will allow you to define logic for this type of processing. + +e.g: Imagine that you have a queue where you receive a message for inserting some information in the database, and you realize that if you do a batch insert is much better then by inserting one by one. + +Define a callback service that implements `BatchConsumerInterface` and add the definition of the consumer to your configuration. + +```yaml +batch_consumers: + batch_basic_consumer: + connection: default + exchange_options: {name: 'batch', type: fanout} + queue_options: {name: 'batch'} + callback: batch.basic + qos_options: {prefetch_size: 0, prefetch_count: 2, global: false} + timeout_wait: 5 + auto_setup_fabric: false + idle_timeout_exit_code: -2 +``` + +You can implement a batch consumer that will acknoledge all messages in one return or you can have control on what message to acknoledge. + +```php +namespace AppBundle\Service; + +use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; +use PhpAmqpLib\Message\AMQPMessage; + +class DevckBasicConsumer implements BatchConsumerInterface +{ + /** + * @inheritDoc + */ + public function batchExecute(array $messages) + { + echo sprintf('Doing batch execution%s', PHP_EOL); + foreach ($messages as $message) { + $this->executeSomeLogicPerMessage($message); + } + + // you ack all messages got in batch + return true; + } +} + +``` +namespace AppBundle\Service; + +use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; +use PhpAmqpLib\Message\AMQPMessage; + +class DevckBasicConsumer implements BatchConsumerInterface +{ + /** + * @inheritDoc + */ + public function batchExecute(array $messages) + { + echo sprintf('Doing batch execution%s', PHP_EOL); + $result = []; + /** @var AMQPMessage $message */ + foreach ($messages as $message) { + $result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message); + } + + // you ack only some messages that have return true + // e.g: + // $return = [ + // 1 => true, + // 2 => true, + // 3 => false, + // 4 => true, + // 5 => -1, + // 6 => 2, + // ]; + // The following will happen: + // * ack: 1,2,4 + // * reject and requeq: 3 + // * nack and requeue: 6 + // * reject and drop: 5 + return $result; + } +} +``` + +How to run the following batch consumer: + +```bash + $ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w +``` + +Important: BatchConsumers will not have the -m|messages option available + ### STDIN Producer ### There's a Command that reads data from STDIN and publishes it to a RabbitMQ queue. To use it first you have to configure a `producer` service in your configuration file like this: From 938d944888ac36c3b19cb34b10e93111319e557d Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Wed, 7 Jun 2017 15:24:06 +0300 Subject: [PATCH 19/22] [change-req] implemented change requests --- Command/BatchConsumerCommand.php | 16 +++++----------- README.md | 5 +++-- RabbitMq/BatchConsumer.php | 26 +++++++++++++------------- 3 files changed, 21 insertions(+), 26 deletions(-) diff --git a/Command/BatchConsumerCommand.php b/Command/BatchConsumerCommand.php index 68c65f10..52a7e8ee 100644 --- a/Command/BatchConsumerCommand.php +++ b/Command/BatchConsumerCommand.php @@ -9,16 +9,13 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -class BatchConsumerCommand extends BaseRabbitMqCommand +final class BatchConsumerCommand extends BaseRabbitMqCommand { /** * @var BatchConsumer */ protected $consumer; - /** - * @return void - */ public function stopConsumer() { if ($this->consumer instanceof BatchConsumer) { @@ -32,12 +29,6 @@ public function stopConsumer() } } - public function restartConsumer() - { - // TODO: Implement restarting of consumer - } - - protected function configure() { parent::configure(); @@ -97,7 +88,10 @@ protected function initConsumer(InputInterface $input) $this->consumer = $this->getContainer() ->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); - if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) { + if (null !== $input->getOption('memory-limit') && + ctype_digit((string) $input->getOption('memory-limit')) && + $input->getOption('memory-limit') > 0 + ) { $this->consumer->setMemoryLimit($input->getOption('memory-limit')); } $this->consumer->setRoutingKey($input->getOption('route')); diff --git a/README.md b/README.md index 23a7bef4..538afe3b 100644 --- a/README.md +++ b/README.md @@ -776,7 +776,7 @@ batch_consumers: idle_timeout_exit_code: -2 ``` -You can implement a batch consumer that will acknoledge all messages in one return or you can have control on what message to acknoledge. +You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge. ```php namespace AppBundle\Service; @@ -800,8 +800,9 @@ class DevckBasicConsumer implements BatchConsumerInterface return true; } } - ``` + +```php namespace AppBundle\Service; use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 4260ca44..323027d6 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -7,57 +7,57 @@ use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; -class BatchConsumer extends BaseAmqp implements DequeuerInterface +final class BatchConsumer extends BaseAmqp implements DequeuerInterface { /** * @var int */ - protected $consumed = 0; + private $consumed = 0; /** * @var \Closure|callable */ - protected $callback; + private $callback; /** * @var bool */ - protected $forceStop = false; + private $forceStop = false; /** * @var int */ - protected $idleTimeout = 0; + private $idleTimeout = 0; /** * @var int */ - protected $idleTimeoutExitCode; + private $idleTimeoutExitCode; /** * @var int */ - protected $memoryLimit = null; + private $memoryLimit = null; /** * @var int */ - protected $prefetchCount; + private $prefetchCount; /** * @var int */ - protected $timeoutWait = 3; + private $timeoutWait = 3; /** * @var array */ - protected $messages = array(); + private $messages = array(); /** * @var int */ - protected $batchCounter = 0; + private $batchCounter = 0; /** * @param \Closure|callable $callback @@ -86,7 +86,7 @@ public function execute(AMQPMessage $msg) $this->maybeStopConsumer(); - if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { + if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { $this->stopConsuming(); } } @@ -188,7 +188,7 @@ protected function handleProcessMessages($processFlags = null) $this->consumed++; $this->maybeStopConsumer(); - if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { + if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) { $this->stopConsuming(); } } From 3e0faa33ce0f8099ddff449516c774f32df618dd Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Thu, 8 Jun 2017 08:58:56 +0300 Subject: [PATCH 20/22] [fix] added method call to idletimeout_exit_code --- DependencyInjection/OldSoundRabbitMqExtension.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index 8207400c..598dc4d1 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -383,6 +383,10 @@ protected function loadBatchConsumers() )) ; + if (isset($consumer['idle_timeout_exit_code'])) { + $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); + } + if (isset($consumer['idle_timeout'])) { $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); } From 6046a41a7f373c8e279258d2117334646275562f Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Thu, 8 Jun 2017 09:47:08 +0300 Subject: [PATCH 21/22] [fixed] implemented suggested code review --- RabbitMq/BaseConsumer.php | 2 -- RabbitMq/BatchConsumer.php | 6 ++---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/RabbitMq/BaseConsumer.php b/RabbitMq/BaseConsumer.php index 56320937..06cf7a8a 100644 --- a/RabbitMq/BaseConsumer.php +++ b/RabbitMq/BaseConsumer.php @@ -90,8 +90,6 @@ protected function maybeStopConsumer() if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) { $this->stopConsuming(); - } else { - return; } } diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 323027d6..3d5ed094 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -121,13 +121,13 @@ public function consume() $this->batchConsume(); } - $timeoutWanted = ($isConsuming) ? $this->getTimeoutWait() : $this->getIdleTimeout(); + $timeoutWanted = $isConsuming ? $this->getTimeoutWait() : $this->getIdleTimeout(); } } public function batchConsume() { - if ($this->batchCounter == 0) { + if ($this->batchCounter === 0) { return; } @@ -387,8 +387,6 @@ protected function maybeStopConsumer() if ($this->forceStop) { $this->stopConsuming(); - } else { - return; } } From a39a2a016c6e983a78c37f72b13679e01885133e Mon Sep 17 00:00:00 2001 From: Bogdan Rancichi Date: Thu, 8 Jun 2017 11:24:01 +0300 Subject: [PATCH 22/22] [fix] remove unused call; remove finally clause; made some methods private --- Command/BatchConsumerCommand.php | 1 - RabbitMq/BatchConsumer.php | 10 ++++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Command/BatchConsumerCommand.php b/Command/BatchConsumerCommand.php index 52a7e8ee..907d5e63 100644 --- a/Command/BatchConsumerCommand.php +++ b/Command/BatchConsumerCommand.php @@ -68,7 +68,6 @@ protected function execute(InputInterface $input, OutputInterface $output) pcntl_signal(SIGTERM, array(&$this, 'stopConsumer')); pcntl_signal(SIGINT, array(&$this, 'stopConsumer')); -// pcntl_signal(SIGHUP, array(&$this, 'restartConsumer')); } if (defined('AMQP_DEBUG') === false) { diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 3d5ed094..54fd535f 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -158,6 +158,7 @@ public function batchConsume() 'stacktrace' => $e->getTraceAsString() ) )); + $this->resetBatch(); throw $e; } catch (\Error $e) { $this->logger->error($e->getMessage(), array( @@ -167,10 +168,11 @@ public function batchConsume() 'stacktrace' => $e->getTraceAsString() ) )); - throw $e; - } finally { $this->resetBatch(); + throw $e; } + + $this->resetBatch(); } /** @@ -323,7 +325,7 @@ private function addMessage(AMQPMessage $message) * * @return AMQPMessage */ - public function getMessage($deliveryTag) + private function getMessage($deliveryTag) { return isset($this->messages[$deliveryTag]) ? $this->messages[$deliveryTag] @@ -338,7 +340,7 @@ public function getMessage($deliveryTag) * * @throws AMQPRuntimeException */ - public function getMessageChannel($deliveryTag) + private function getMessageChannel($deliveryTag) { $message = $this->getMessage($deliveryTag); if (!$message) {