Skip to content

Commit

Permalink
Consumers recovery after RabbitMQ restart (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yair Ogen committed Jul 5, 2016
1 parent d22aa95 commit f5074c5
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 35 deletions.
2 changes: 1 addition & 1 deletion queue-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<groupId>com.cisco.oss.foundation.queue</groupId>
<artifactId>queue-rabbitmq</artifactId>
<name>queue-rabbitmq</name>
<version>1.4.0-3-SNAPSHOT</version>
<version>1.4.1-0-SNAPSHOT</version>
<description>This project is a rabbitmq implementation for the queue-api library.</description>
<url>https://github.com/foundation-runtime/communication/tree/master/queue-rabbitmq</url>
<issueManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class RabbitMQMessageConsumer implements MessageConsumer {
} catch (InterruptedException e) {
LOGGER.error("error waiting for init to finish: " + e);
}
Channel channel = RabbitMQMessagingFactory.getChannel();
Channel channel = RabbitMQMessagingFactory.getConsumerChannel();
channel.exchangeDeclare(subscribedTo, exchangeType, isDurable, false, false, null);

Map<String, Object> args = new HashMap<String, Object>();
Expand Down Expand Up @@ -145,7 +145,7 @@ public void registerMessageHandler(MessageHandler messageHandler, boolean autoAc
try {
if (messageHandler instanceof AbstractRabbitMQMessageHandler) {
String consumerTag = FlowContextFactory.getFlowContext() != null ? FlowContextFactory.getFlowContext().getUniqueId() : "N/A";
Channel channel = RabbitMQMessagingFactory.getChannel();
Channel channel = RabbitMQMessagingFactory.getConsumerChannel();
this.channelNumber = channel.getChannelNumber();
((AbstractRabbitMQMessageHandler) messageHandler).setChannelNumber(channelNumber);
this.consumerTag = channel.basicConsume(queueName, autoAck, consumerTag, (Consumer) messageHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class RabbitMQMessageProducer extends AbstractMessageProducer {
isPersistent = subset.getBoolean("queue.isPersistent", true);

try {
Channel channel = RabbitMQMessagingFactory.getChannel();
Channel channel = RabbitMQMessagingFactory.getProducerChannel();
channel.exchangeDeclare(queueName, exchangeType, isDurable, false, false, null);
isInitialized.set(true);
} catch (QueueException e) {
Expand Down Expand Up @@ -108,7 +108,7 @@ public void sendMessage(byte[] message, Map<String, Object> messageHeaders) {
sendMessageInternal(message, messageHeaders);
}else{
try {
Channel channel = RabbitMQMessagingFactory.getChannel();
Channel channel = RabbitMQMessagingFactory.getProducerChannel();
channel.exchangeDeclare(queueName, "topic", true, false, false, null);
isInitialized.set(true);
sendMessageInternal(message, messageHeaders);
Expand All @@ -134,12 +134,12 @@ private void sendMessageInternal(byte[] message, Map<String, Object> messageHead
if(routingKeyObj!= null && StringUtils.isNotBlank(routingKeyObj.toString())){
routingKey = routingKeyObj.toString();
}
RabbitMQMessagingFactory.getChannel().basicPublish(queueName, routingKey, basicProperties, message);
RabbitMQMessagingFactory.getProducerChannel().basicPublish(queueName, routingKey, basicProperties, message);
} catch (AlreadyClosedException e) {
LOGGER.warn("an error occurred trying to publish message: {}", e);
RabbitMQMessagingFactory.channelThreadLocal.set(null);
try {
RabbitMQMessagingFactory.getChannel().basicPublish(queueName, "", basicProperties, message);
RabbitMQMessagingFactory.getProducerChannel().basicPublish(queueName, "", basicProperties, message);
} catch (Exception e1) {
startReConnectThread();
throw new QueueException("an error occurred trying to publish message: " + e1, e1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class RabbitMQMessagingFactory {
// private static List<Channel> channels = new CopyOnWriteArrayList<>();
private static PriorityBlockingQueue<AckNackMessage> messageAckQueue = new PriorityBlockingQueue<AckNackMessage>(10000);
static Map<Integer, Channel> channels = new ConcurrentHashMap<Integer, Channel>();
private static Connection connection = null;
private static Connection consumerConnection = null;
private static Connection producerConnection = null;
private static AtomicBoolean IS_RECONNECT_THREAD_RUNNING = new AtomicBoolean(false);
static AtomicBoolean IS_CONNECTED = new AtomicBoolean(false);
static CountDownLatch INIT_LATCH = new CountDownLatch(1);
Expand Down Expand Up @@ -76,7 +77,12 @@ public void run() {
channel.close();
}

connection.close();
if (consumerConnection != null) {
consumerConnection.close();
}
if (producerConnection != null) {
producerConnection.close();
}

} catch (Exception e) {
LOGGER.error("can't close RabbitMQ resources, error: {}", e, e);
Expand Down Expand Up @@ -167,25 +173,10 @@ static void connect() {
// connectionFactory.setPort(Integer.parseInt(port));
}
Address[] addrs = new Address[0];
connection = connectionFactory.newConnection(addresses.toArray(addrs));
connection.addBlockedListener(new BlockedListener() {
public void handleBlocked(String reason) throws IOException {
LOGGER.error("RabbitMQ connection is now blocked. Port: {}, Reason: {}", connection.getPort(), reason);
IS_BLOCKED.set(true);
}

public void handleUnblocked() throws IOException {
LOGGER.info("RabbitMQ connection is now un-blocked. Port: {}", connection.getPort());
IS_BLOCKED.set(false);
}
});

connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
LOGGER.error("Connection shutdown detected. Reason: {}", cause.toString(), cause);
}
});
consumerConnection = connectionFactory.newConnection(addresses.toArray(addrs));
producerConnection = connectionFactory.newConnection(addresses.toArray(addrs));
addConnectionListeners(consumerConnection);
addConnectionListeners(producerConnection);

IS_CONNECTED.set(true);
INIT_LATCH.countDown();
Expand All @@ -197,11 +188,51 @@ public void shutdownCompleted(ShutdownSignalException cause) {
}
}

static Channel getChannel() {
private static void addConnectionListeners(final Connection connection) {
connection.addBlockedListener(new BlockedListener() {
public void handleBlocked(String reason) throws IOException {
LOGGER.error("RabbitMQ connection is now blocked. Port: {}, Reason: {}", connection.getPort(), reason);
IS_BLOCKED.set(true);
}

public void handleUnblocked() throws IOException {
LOGGER.info("RabbitMQ connection is now un-blocked. Port: {}", connection.getPort());
IS_BLOCKED.set(false);
}
});

connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
LOGGER.error("Connection shutdown detected. Reason: {}", cause.toString(), cause);
}
});
}

static Channel getConsumerChannel() {
try {
if (channelThreadLocal.get() == null) {
if (consumerConnection != null) {
Channel channel = consumerConnection.createChannel();
channelThreadLocal.set(channel);
channels.put(channel.getChannelNumber(), channel);
} else {
throw new QueueException("RabbitMQ appears to be down. Please try again later.");
}
}

return channelThreadLocal.get();
} catch (IOException e) {
throw new QueueException("can't create channel: " + e.toString(), e);
}

}

static Channel getProducerChannel() {
try {
if (channelThreadLocal.get() == null) {
if (connection != null) {
Channel channel = connection.createChannel();
if (producerConnection != null) {
Channel channel = producerConnection.createChannel();
channelThreadLocal.set(channel);
channels.put(channel.getChannelNumber(), channel);
} else {
Expand Down Expand Up @@ -323,7 +354,7 @@ public void run() {

public static boolean deleteQueue(String queueName) {
try {
getChannel().queueDelete(queueName);
getConsumerChannel().queueDelete(queueName);
return true;
} catch (IOException e) {
LOGGER.warn("can't delete queue: {}", e);
Expand All @@ -333,7 +364,7 @@ public static boolean deleteQueue(String queueName) {

public static boolean deleteQueue(String queueName, boolean deleteOnlyIfNotUsed, boolean deeltenlyIfNotEmpty) {
try {
getChannel().queueDelete(queueName, deleteOnlyIfNotUsed, deeltenlyIfNotEmpty);
getConsumerChannel().queueDelete(queueName, deleteOnlyIfNotUsed, deeltenlyIfNotEmpty);
return true;
} catch (IOException e) {
LOGGER.warn("can't delete queue: {}", e);
Expand All @@ -343,7 +374,7 @@ public static boolean deleteQueue(String queueName, boolean deleteOnlyIfNotUsed,

public static boolean deleteExchange(String exchangeName) {
try {
getChannel().exchangeDelete(exchangeName);
getConsumerChannel().exchangeDelete(exchangeName);
return true;
} catch (IOException e) {
LOGGER.warn("can't delete exchange: {}", e);
Expand All @@ -354,7 +385,7 @@ public static boolean deleteExchange(String exchangeName) {

public static boolean deleteExchange(String exchangeName, boolean deleteOnlyIfNotUsed) {
try {
getChannel().exchangeDelete(exchangeName, deleteOnlyIfNotUsed);
getConsumerChannel().exchangeDelete(exchangeName, deleteOnlyIfNotUsed);
return true;
} catch (IOException e) {
LOGGER.warn("can't delete exchange: {}", e);
Expand Down

0 comments on commit f5074c5

Please sign in to comment.