diff --git a/.travis.yml b/.travis.yml index f9bf5a3c..03ce46d0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: php sudo: false php: + - 7.2 - 7.1 - 7.0 - 5.6 @@ -34,6 +35,8 @@ matrix: env: SYMFONY_VERSION=3.0.* - php: 7.1 env: DEPENDENCIES=beta + - php: 7.2 + env: DEPENDENCIES=beta allow_failures: - php: nightly diff --git a/DependencyInjection/Compiler/RegisterPartsPass.php b/DependencyInjection/Compiler/RegisterPartsPass.php index 9285f384..c6d7a06d 100644 --- a/DependencyInjection/Compiler/RegisterPartsPass.php +++ b/DependencyInjection/Compiler/RegisterPartsPass.php @@ -25,6 +25,7 @@ public function process(ContainerBuilder $container) 'old_sound_rabbit_mq.consumer', 'old_sound_rabbit_mq.multi_consumer', 'old_sound_rabbit_mq.anon_consumer', + 'old_sound_rabbit_mq.batch_consumer', 'old_sound_rabbit_mq.rpc_client', 'old_sound_rabbit_mq.rpc_server', ); diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 24cc5e57..d7070536 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -30,9 +30,8 @@ public function __construct($name) public function getConfigTreeBuilder() { - $tree = new TreeBuilder(); - - $rootNode = $tree->root($this->name); + $tree = new TreeBuilder($this->name); + $rootNode = \method_exists(TreeBuilder::class, 'getRootNode') ? $tree->getRootNode() : $tree->root($this->name); $rootNode ->children() @@ -233,6 +232,13 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node) ->scalarNode('callback')->isRequired()->end() ->scalarNode('idle_timeout')->end() ->scalarNode('idle_timeout_exit_code')->end() + ->arrayNode('graceful_max_execution') + ->canBeUnset() + ->children() + ->integerNode('timeout')->end() + ->integerNode('exit_code')->defaultValue(0)->end() + ->end() + ->end() ->scalarNode('auto_setup_fabric')->defaultTrue()->end() ->arrayNode('qos_options') ->canBeUnset() diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index 785c6c49..0cd81c4a 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -262,8 +262,8 @@ protected function loadMultipleConsumers() } $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); - $definition->setPublic(true); $definition + ->setPublic(true) ->addTag('old_sound_rabbit_mq.base_amqp') ->addTag('old_sound_rabbit_mq.multi_consumer') ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) @@ -336,8 +336,8 @@ protected function loadDynamicConsumers() } $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); - $definition->setPublic(true); $definition + ->setPublic(true) ->addTag('old_sound_rabbit_mq.base_amqp') ->addTag('old_sound_rabbit_mq.consumer') ->addTag('old_sound_rabbit_mq.dynamic_consumer') @@ -363,6 +363,16 @@ protected function loadDynamicConsumers() if (isset($consumer['idle_timeout_exit_code'])) { $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); } + if (isset($consumer['graceful_max_execution'])) { + $definition->addMethodCall( + 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', + array($consumer['graceful_max_execution']['timeout']) + ); + $definition->addMethodCall( + 'setGracefulMaxExecutionTimeoutExitCode', + array($consumer['graceful_max_execution']['exit_code']) + ); + } if (!$consumer['auto_setup_fabric']) { $definition->addMethodCall('disableAutoSetupFabric'); } @@ -387,13 +397,13 @@ protected function loadBatchConsumers() { foreach ($this->config['batch_consumers'] as $key => $consumer) { $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); - $definition->setPublic(true); if (!isset($consumer['exchange_options'])) { $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); } $definition + ->setPublic(true) ->addTag('old_sound_rabbit_mq.base_amqp') ->addTag('old_sound_rabbit_mq.batch_consumer') ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) @@ -448,9 +458,8 @@ protected function loadAnonConsumers() { foreach ($this->config['anon_consumers'] as $key => $anon) { $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); - $definition->setPublic(true); - $definition + ->setPublic(true) ->addTag('old_sound_rabbit_mq.base_amqp') ->addTag('old_sound_rabbit_mq.anon_consumer') ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options']))) diff --git a/Provider/ConnectionParametersProviderInterface.php b/Provider/ConnectionParametersProviderInterface.php index ae8e72fa..6dcaf219 100644 --- a/Provider/ConnectionParametersProviderInterface.php +++ b/Provider/ConnectionParametersProviderInterface.php @@ -25,8 +25,12 @@ interface ConnectionParametersProviderInterface * 'keepalive' => false, * 'heartbeat' => 0, * 'use_socket' => true, + * 'constructor_args' => array(...) * ) * + * If constructor_args is present, all the other parameters are ignored; constructor_args are passes as constructor + * arguments. + * * @return array */ public function getConnectionParameters(); diff --git a/RabbitMq/AMQPConnectionFactory.php b/RabbitMq/AMQPConnectionFactory.php index 0adac75d..3e8092d5 100644 --- a/RabbitMq/AMQPConnectionFactory.php +++ b/RabbitMq/AMQPConnectionFactory.php @@ -60,22 +60,45 @@ public function __construct( */ public function createConnection() { - return new $this->class( - $this->parameters['host'], - $this->parameters['port'], - $this->parameters['user'], - $this->parameters['password'], - $this->parameters['vhost'], - false, // insist - 'AMQPLAIN', // login_method - null, // login_response - 'en_US', // locale - $this->parameters['connection_timeout'], - $this->parameters['read_write_timeout'], - $this->parameters['ssl_context'], - $this->parameters['keepalive'], - $this->parameters['heartbeat'] - ); + if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) { + $ref = new \ReflectionClass($this->class); + return $ref->newInstanceArgs($this->parameters['constructor_args']); + } + + if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class , 'PhpAmqpLib\Connection\AMQPSocketConnection')) { + return new $this->class( + $this->parameters['host'], + $this->parameters['port'], + $this->parameters['user'], + $this->parameters['password'], + $this->parameters['vhost'], + false, // insist + 'AMQPLAIN', // login_method + null, // login_response + 'en_US', // locale + isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'], + $this->parameters['keepalive'], + isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'], + $this->parameters['heartbeat'] + ); + } else { + return new $this->class( + $this->parameters['host'], + $this->parameters['port'], + $this->parameters['user'], + $this->parameters['password'], + $this->parameters['vhost'], + false, // insist + 'AMQPLAIN', // login_method + null, // login_response + 'en_US', // locale + $this->parameters['connection_timeout'], + $this->parameters['read_write_timeout'], + $this->parameters['ssl_context'], + $this->parameters['keepalive'], + $this->parameters['heartbeat'] + ); + } } /** diff --git a/Resources/config/rabbitmq.xml b/Resources/config/rabbitmq.xml index 94302335..28151ed9 100644 --- a/Resources/config/rabbitmq.xml +++ b/Resources/config/rabbitmq.xml @@ -27,7 +27,7 @@ - + diff --git a/Tests/RabbitMq/AMQPConnectionFactoryTest.php b/Tests/RabbitMq/AMQPConnectionFactoryTest.php index cd4004c2..09a373b9 100644 --- a/Tests/RabbitMq/AMQPConnectionFactoryTest.php +++ b/Tests/RabbitMq/AMQPConnectionFactoryTest.php @@ -5,6 +5,7 @@ use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface; use OldSound\RabbitMqBundle\RabbitMq\AMQPConnectionFactory; use OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection; +use OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection; use PHPUnit\Framework\TestCase; class AMQPConnectionFactoryTest extends TestCase @@ -37,6 +38,63 @@ public function testDefaultValues() ), $instance->constructParams); } + public function testSocketConnection() + { + $factory = new AMQPConnectionFactory( + 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection', + array() + ); + + /** @var AMQPSocketConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf('PhpAmqpLib\Connection\AMQPSocketConnection', $instance); + $this->assertEquals(array( + 'localhost', // host + 5672, // port + 'guest', // user + 'guest', // password + '/', // vhost + false, // insist + "AMQPLAIN", // login method + null, // login response + "en_US", // locale + 3, // read_timeout + false, // keepalive + 3, // write_timeout + 0, // heartbeat + ), $instance->constructParams); + } + + public function testSocketConnectionWithParams() + { + $factory = new AMQPConnectionFactory( + 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection', + array( + 'read_timeout' => 31, + 'write_timeout' => 32, + ) + ); + + /** @var AMQPSocketConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf('PhpAmqpLib\Connection\AMQPSocketConnection', $instance); + $this->assertEquals(array( + 'localhost', // host + 5672, // port + 'guest', // user + 'guest', // password + '/', // vhost + false, // insist + "AMQPLAIN", // login method + null, // login response + "en_US", // locale + 31, // read_timeout + false, // keepalive + 32, // write_timeout + 0, // heartbeat + ), $instance->constructParams); + } + public function testStandardConnectionParameters() { $factory = new AMQPConnectionFactory( @@ -211,6 +269,28 @@ public function testSSLConnectionParameters() ), $instance->constructParams); } + public function testConnectionsParametersProviderWithConstructorArgs() + { + $connectionParametersProvider = $this->prepareConnectionParametersProvider(); + $connectionParametersProvider->expects($this->once()) + ->method('getConnectionParameters') + ->will($this->returnValue( + array( + 'constructor_args' => array(1,2,3,4) + ) + )); + $factory = new AMQPConnectionFactory( + 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', + array(), + $connectionParametersProvider + ); + + /** @var AMQPConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); + $this->assertEquals(array(1,2,3,4), $instance->constructParams); + } + public function testConnectionsParametersProvider() { $connectionParametersProvider = $this->prepareConnectionParametersProvider(); diff --git a/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php b/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php new file mode 100644 index 00000000..48241bff --- /dev/null +++ b/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php @@ -0,0 +1,16 @@ +constructParams = func_get_args(); + } +} diff --git a/composer.json b/composer.json index 76bcf477..94866ac2 100644 --- a/composer.json +++ b/composer.json @@ -1,8 +1,8 @@ { - "name": "php-amqplib/rabbitmq-bundle", + "name": "ecentria/rabbitmq-bundle", "type": "symfony-bundle", "description": "Integrates php-amqplib with Symfony & RabbitMq. Formerly oldsound/rabbitmq-bundle.", - "keywords": ["symfony2", "rabbitmq", "message", "queue", "amqp"], + "keywords": ["symfony", "symfony2", "symfony3", "symfony4", "rabbitmq", "message", "queue", "amqp"], "license": "MIT", "authors": [{ "name" : "Alvaro Videla" @@ -24,7 +24,8 @@ "phpunit/phpunit": "^4.8.35|^5.4.3" }, "replace": { - "oldsound/rabbitmq-bundle": "self.version" + "oldsound/rabbitmq-bundle": "self.version", + "php-amqplib/rabbitmq-bundle": "self.version" }, "suggest": { "symfony/framework-bundle": "To use this lib as a full Symfony Bundle and to use the profiler data collector"