-
Notifications
You must be signed in to change notification settings - Fork 13
Async Message Queue
Foundation Communication supports a simple API that exposes API or async messaging. Although JMS being a standard we've found that many times other vendors can do better. Many such vendors are not JMS Compliant (like RabbitMQ) or have a richer non-jms API (like HornetQ core).
Following is an explanation on how to use and configure the Queue API.
The Main building blocks are:
- Message - Basic message POJO.
- MessageConsumer - basic interface that defines API available for message consumers.
- MessageProducer - basic interface that defines API available for message producers.
- MessagingFactory - This is the main API to be used to instantiate new consumers and producers. This class supports a Per Thread lifecycle for HornetQ session, consumers and producers. We also have a RabbitMQ flavor.
Note: For both producer and consumer, the name is also used as a "configuration prefix" in order to focus the subset of the configuration per producer/consumer. Examples below.
final MessageProducer hqProducer = HornetQMessagingFactory.createProducer("example");
hqProducer.sendMessage("hello world!");
//optionally send message properties
Map<String,Object> props = new HashMap<String,Object>();
props.put("key1","value2");
hqProducer.sendMessage("hello world!",props);
}
OR
final final MessageProducer rabbitProducer = RabbitMQMessagingFactory.createProducer("example");
rabbitProducer .sendMessage("hello world!");
//optionally send message properties
Map<String,Object> props = new HashMap<String,Object>();
props.put("key1","value2");
rabbitProducer .sendMessage("hello world!",props);
}
Depending on configuration consumers can be either in a 1:1 form (i.e. producer send message and only one consumer can get it), or 1:n (i.e. many consumers can the same copy although producer sent only once. A.K.A "JMS Topic").
final MessageConsumer hqConsumer = HornetQMessagingFactory.createConsumer("consumer1");
Message message = hqConsumer .receive(2500);
system.out.println("got message: " + message.getPayloadAsString()):
OR
final MessageConsumer rabbitConsumer = RabbitMQMessagingFactory.createConsumer("consumer1");
Message message = rabbitConsumer .receive(2500);
system.out.println("got message: " + message.getPayloadAsString()):
final MessageConsumer consumer = HornetQMessagingFactory.createConsumer("consumer1");
consumer.registerMessageHandler(new AbstractHornetQMessageHandler() {
@Override
public void onMessage(Message message) {
System.out.println("[1] " + message.getPayloadAsString());
}
});
OR
final MessageConsumer rabbitConsumer = RabbitMQMessagingFactory.createConsumer("consumer1");
rabbitConsumer .registerMessageHandler(new AbstractRabbitMQMessageHandler() {
@Override
public void onMessage(Message message) {
System.out.println("[1] " + message.getPayloadAsString());
}
});
Note: HornetQ Core API is not thread safe, so you may want to consider wrapping the code in a Thread:
Thread t = new Thread(new Runnable() {
@Override
public void run() {
final MessageConsumer consumer = HornetQMessagingFactory.createConsumer("consumer1");
consumer.registerMessageHandler(new AbstractHornetQMessageHandler() {
@Override
public void onMessage(Message message) {
System.out.println("[1] " + message.getPayloadAsString());
}
});
}
});
t.setDaemon(false);
t.start();
The lib has support for preserving message ordering in case you want to handle messages in a thread pool so the the listener thread isn't a bottleneck. For this use either AbstractHornetQConcurrentMessageHandler or AbstractRabbitMQConcurrentMessageHandler as your handler super class.
Name | Description | Default Value |
---|---|---|
<consumer-name>.queue.name | Optional queue name. | when not set, by default the component name and instance id will be used |
<consumer-name>.queue.filter | Optional filter string - For JMS Applications this is the JMS Selector string. For AMQP you can use this to set the routing key. | N/A |
<consumer-name>.queue.isSubscription | set to true if this consumer is a 'Topic Consumer' | false |
<consumer-name>.queue.filters | Optional filters array - This supported in AMQP only for cases you want multiple routing keys. Usage: <consumer-name>.queue.filters.1=RK1 <consumer-name>.queue.filters.1=RK1 etc. | N/A |
<consumer-name>.queue.subscribedTo | if '' set to true specify here the address used by the producer when publishing a message | N/A |
<consumer-name>.queue.isDurable | sets the consumer to be durable. i.e. survive restarts. | true |
Name | Description | Default Value |
---|---|---|
<producer-name>.queue.name | Mandatory Queue/Address name. | N/A |
<producer-name>.queue.isDurable | sets the producer to be durable. i.e. survive restarts. | true |
#Client connections to HornetQ/RabbitMQ Server - you can define as many entries as needed. Note: Currently only a single RabbitMQ node is supported
service.queue.connections.1.host=10.45.37.122
service.queue.connections.1.port=5445
service.queue.connections.2.host=10.45.37.123
service.queue.connections.2.port=5445
#consumer properties
#-------------------
#consumer1 - this is the consumer name that was used in code to initialize a new consumer
consumer1.queue.name=consumer1Queue
#an optional filter - only messages with proeprties aligned with this filter will be received by this consumer
consumer1.queue.filter=key1='value2'
#set to true if this consumer is a part of a 1:N consumers (like a topic subscriber)
consumer1.queue.isSubscription=true
#if isSubscription is set to true, specify here the publisher address name
consumer1.queue.subscribedTo=myExample
#producer properties
#-------------------
#example is the producer name as supplied in code when initializing a new producer
example.queue.name=myExample