diff --git a/queue-rabbitmq/pom.xml b/queue-rabbitmq/pom.xml index 33aa548..48e1b40 100644 --- a/queue-rabbitmq/pom.xml +++ b/queue-rabbitmq/pom.xml @@ -24,7 +24,7 @@ com.cisco.oss.foundation.queue queue-rabbitmq queue-rabbitmq - 1.4.0-3-SNAPSHOT + 1.4.1-0-SNAPSHOT This project is a rabbitmq implementation for the queue-api library. https://github.com/foundation-runtime/communication/tree/master/queue-rabbitmq diff --git a/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageConsumer.java b/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageConsumer.java index 9f174f2..7c2c261 100644 --- a/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageConsumer.java +++ b/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageConsumer.java @@ -75,7 +75,7 @@ public class RabbitMQMessageConsumer implements MessageConsumer { } catch (InterruptedException e) { LOGGER.error("error waiting for init to finish: " + e); } - Channel channel = RabbitMQMessagingFactory.getChannel(); + Channel channel = RabbitMQMessagingFactory.getConsumerChannel(); channel.exchangeDeclare(subscribedTo, exchangeType, isDurable, false, false, null); Map args = new HashMap(); @@ -145,7 +145,7 @@ public void registerMessageHandler(MessageHandler messageHandler, boolean autoAc try { if (messageHandler instanceof AbstractRabbitMQMessageHandler) { String consumerTag = FlowContextFactory.getFlowContext() != null ? FlowContextFactory.getFlowContext().getUniqueId() : "N/A"; - Channel channel = RabbitMQMessagingFactory.getChannel(); + Channel channel = RabbitMQMessagingFactory.getConsumerChannel(); this.channelNumber = channel.getChannelNumber(); ((AbstractRabbitMQMessageHandler) messageHandler).setChannelNumber(channelNumber); this.consumerTag = channel.basicConsume(queueName, autoAck, consumerTag, (Consumer) messageHandler); diff --git a/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageProducer.java b/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageProducer.java index 9400de9..4789237 100644 --- a/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageProducer.java +++ b/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessageProducer.java @@ -67,7 +67,7 @@ class RabbitMQMessageProducer extends AbstractMessageProducer { isPersistent = subset.getBoolean("queue.isPersistent", true); try { - Channel channel = RabbitMQMessagingFactory.getChannel(); + Channel channel = RabbitMQMessagingFactory.getProducerChannel(); channel.exchangeDeclare(queueName, exchangeType, isDurable, false, false, null); isInitialized.set(true); } catch (QueueException e) { @@ -108,7 +108,7 @@ public void sendMessage(byte[] message, Map messageHeaders) { sendMessageInternal(message, messageHeaders); }else{ try { - Channel channel = RabbitMQMessagingFactory.getChannel(); + Channel channel = RabbitMQMessagingFactory.getProducerChannel(); channel.exchangeDeclare(queueName, "topic", true, false, false, null); isInitialized.set(true); sendMessageInternal(message, messageHeaders); @@ -134,12 +134,12 @@ private void sendMessageInternal(byte[] message, Map messageHead if(routingKeyObj!= null && StringUtils.isNotBlank(routingKeyObj.toString())){ routingKey = routingKeyObj.toString(); } - RabbitMQMessagingFactory.getChannel().basicPublish(queueName, routingKey, basicProperties, message); + RabbitMQMessagingFactory.getProducerChannel().basicPublish(queueName, routingKey, basicProperties, message); } catch (AlreadyClosedException e) { LOGGER.warn("an error occurred trying to publish message: {}", e); RabbitMQMessagingFactory.channelThreadLocal.set(null); try { - RabbitMQMessagingFactory.getChannel().basicPublish(queueName, "", basicProperties, message); + RabbitMQMessagingFactory.getProducerChannel().basicPublish(queueName, "", basicProperties, message); } catch (Exception e1) { startReConnectThread(); throw new QueueException("an error occurred trying to publish message: " + e1, e1); diff --git a/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessagingFactory.java b/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessagingFactory.java index 2f7b46e..a76d8ad 100644 --- a/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessagingFactory.java +++ b/queue-rabbitmq/src/main/java/com/cisco/oss/foundation/message/RabbitMQMessagingFactory.java @@ -47,7 +47,8 @@ public class RabbitMQMessagingFactory { // private static List channels = new CopyOnWriteArrayList<>(); private static PriorityBlockingQueue messageAckQueue = new PriorityBlockingQueue(10000); static Map channels = new ConcurrentHashMap(); - private static Connection connection = null; + private static Connection consumerConnection = null; + private static Connection producerConnection = null; private static AtomicBoolean IS_RECONNECT_THREAD_RUNNING = new AtomicBoolean(false); static AtomicBoolean IS_CONNECTED = new AtomicBoolean(false); static CountDownLatch INIT_LATCH = new CountDownLatch(1); @@ -76,7 +77,12 @@ public void run() { channel.close(); } - connection.close(); + if (consumerConnection != null) { + consumerConnection.close(); + } + if (producerConnection != null) { + producerConnection.close(); + } } catch (Exception e) { LOGGER.error("can't close RabbitMQ resources, error: {}", e, e); @@ -167,25 +173,10 @@ static void connect() { // connectionFactory.setPort(Integer.parseInt(port)); } Address[] addrs = new Address[0]; - connection = connectionFactory.newConnection(addresses.toArray(addrs)); - connection.addBlockedListener(new BlockedListener() { - public void handleBlocked(String reason) throws IOException { - LOGGER.error("RabbitMQ connection is now blocked. Port: {}, Reason: {}", connection.getPort(), reason); - IS_BLOCKED.set(true); - } - - public void handleUnblocked() throws IOException { - LOGGER.info("RabbitMQ connection is now un-blocked. Port: {}", connection.getPort()); - IS_BLOCKED.set(false); - } - }); - - connection.addShutdownListener(new ShutdownListener() { - @Override - public void shutdownCompleted(ShutdownSignalException cause) { - LOGGER.error("Connection shutdown detected. Reason: {}", cause.toString(), cause); - } - }); + consumerConnection = connectionFactory.newConnection(addresses.toArray(addrs)); + producerConnection = connectionFactory.newConnection(addresses.toArray(addrs)); + addConnectionListeners(consumerConnection); + addConnectionListeners(producerConnection); IS_CONNECTED.set(true); INIT_LATCH.countDown(); @@ -197,11 +188,51 @@ public void shutdownCompleted(ShutdownSignalException cause) { } } - static Channel getChannel() { + private static void addConnectionListeners(final Connection connection) { + connection.addBlockedListener(new BlockedListener() { + public void handleBlocked(String reason) throws IOException { + LOGGER.error("RabbitMQ connection is now blocked. Port: {}, Reason: {}", connection.getPort(), reason); + IS_BLOCKED.set(true); + } + + public void handleUnblocked() throws IOException { + LOGGER.info("RabbitMQ connection is now un-blocked. Port: {}", connection.getPort()); + IS_BLOCKED.set(false); + } + }); + + connection.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + LOGGER.error("Connection shutdown detected. Reason: {}", cause.toString(), cause); + } + }); + } + + static Channel getConsumerChannel() { + try { + if (channelThreadLocal.get() == null) { + if (consumerConnection != null) { + Channel channel = consumerConnection.createChannel(); + channelThreadLocal.set(channel); + channels.put(channel.getChannelNumber(), channel); + } else { + throw new QueueException("RabbitMQ appears to be down. Please try again later."); + } + } + + return channelThreadLocal.get(); + } catch (IOException e) { + throw new QueueException("can't create channel: " + e.toString(), e); + } + + } + + static Channel getProducerChannel() { try { if (channelThreadLocal.get() == null) { - if (connection != null) { - Channel channel = connection.createChannel(); + if (producerConnection != null) { + Channel channel = producerConnection.createChannel(); channelThreadLocal.set(channel); channels.put(channel.getChannelNumber(), channel); } else { @@ -323,7 +354,7 @@ public void run() { public static boolean deleteQueue(String queueName) { try { - getChannel().queueDelete(queueName); + getConsumerChannel().queueDelete(queueName); return true; } catch (IOException e) { LOGGER.warn("can't delete queue: {}", e); @@ -333,7 +364,7 @@ public static boolean deleteQueue(String queueName) { public static boolean deleteQueue(String queueName, boolean deleteOnlyIfNotUsed, boolean deeltenlyIfNotEmpty) { try { - getChannel().queueDelete(queueName, deleteOnlyIfNotUsed, deeltenlyIfNotEmpty); + getConsumerChannel().queueDelete(queueName, deleteOnlyIfNotUsed, deeltenlyIfNotEmpty); return true; } catch (IOException e) { LOGGER.warn("can't delete queue: {}", e); @@ -343,7 +374,7 @@ public static boolean deleteQueue(String queueName, boolean deleteOnlyIfNotUsed, public static boolean deleteExchange(String exchangeName) { try { - getChannel().exchangeDelete(exchangeName); + getConsumerChannel().exchangeDelete(exchangeName); return true; } catch (IOException e) { LOGGER.warn("can't delete exchange: {}", e); @@ -354,7 +385,7 @@ public static boolean deleteExchange(String exchangeName) { public static boolean deleteExchange(String exchangeName, boolean deleteOnlyIfNotUsed) { try { - getChannel().exchangeDelete(exchangeName, deleteOnlyIfNotUsed); + getConsumerChannel().exchangeDelete(exchangeName, deleteOnlyIfNotUsed); return true; } catch (IOException e) { LOGGER.warn("can't delete exchange: {}", e);