Skip to content

Async Message Queue

KerenSi edited this page Apr 21, 2016 · 23 revisions

Foundation Communication supports a simple API that exposes API or async messaging. Although JMS being a standard we've found that many times other vendors can do better. Many such vendors are not JMS Compliant (like RabbitMQ) or have a richer non-jms API (like HornetQ core).

Following is an explanation on how to use and configure the Queue API.

API

The Main building blocks are:

  • Message - Basic message POJO.
  • MessageConsumer - basic interface that defines API available for message consumers.
  • MessageProducer - basic interface that defines API available for message producers.
  • MessagingFactory - This is the main API to be used to instantiate new consumers and producers. This class supports a Per Thread lifecycle for HornetQ session, consumers and producers. We also have a RabbitMQ flavor.

Usage Example

Note: For both producer and consumer, the name is also used as a "configuration prefix" in order to focus the subset of the configuration per producer/consumer. Examples below.

Producer

final MessageProducer hqProducer = HornetQMessagingFactory.createProducer("example");
hqProducer.sendMessage("hello world!");

//optionally send message properties
Map<String,Object> props = new HashMap<String,Object>();
props.put("key1","value2");
hqProducer.sendMessage("hello world!",props);
        }

OR

final final MessageProducer rabbitProducer = RabbitMQMessagingFactory.createProducer("example");
rabbitProducer .sendMessage("hello world!");

//optionally send message properties
Map<String,Object> props = new HashMap<String,Object>();
props.put("key1","value2");
rabbitProducer .sendMessage("hello world!",props);
        }

Consumer

Depending on configuration consumers can be either in a 1:1 form (i.e. producer send message and only one consumer can get it), or 1:n (i.e. many consumers can the same copy although producer sent only once. A.K.A "JMS Topic").

Sync Receive

final MessageConsumer hqConsumer = HornetQMessagingFactory.createConsumer("consumer1");
Message message = hqConsumer .receive(2500);
system.out.println("got message: " + message.getPayloadAsString()):

OR

final MessageConsumer rabbitConsumer = RabbitMQMessagingFactory.createConsumer("consumer1");
Message message = rabbitConsumer .receive(2500);
system.out.println("got message: " + message.getPayloadAsString()):

A-Sync Receive

final MessageConsumer consumer = HornetQMessagingFactory.createConsumer("consumer1");
consumer.registerMessageHandler(new AbstractHornetQMessageHandler() {
    @Override
    public void process(Message message) {
        System.out.println("[1] " + message.getPayloadAsString());
    }
});

OR

final MessageConsumer rabbitConsumer = RabbitMQMessagingFactory.createConsumer("consumer1");
rabbitConsumer .registerMessageHandler(new AbstractRabbitMQMessageHandler() {
    @Override
    public void onMessage(Message message) {
        System.out.println("[1] " + message.getPayloadAsString());
    }
});

Note: HornetQ Core API is not thread safe, so you may want to consider wrapping the code in a Thread:

Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                final MessageConsumer consumer = HornetQMessagingFactory.createConsumer("consumer1");
consumer.registerMessageHandler(new AbstractHornetQMessageHandler() {
    @Override
    public void onMessage(Message message) {
        System.out.println("[1] " + message.getPayloadAsString());
    }
});
            }
        });

        t.setDaemon(false);

        t.start();

A-Sync Receive - preserving order

The lib has support for preserving message ordering in case you want to handle messages in a thread pool so the the listener thread isn't a bottleneck. For this use either AbstractHornetQConcurrentMessageHandler or AbstractRabbitMQConcurrentMessageHandler as your handler super class. When implementing concurrent handler you should implement 'process' instead of 'onMessage'.

Dependencies

Consumer Dependencies

General:

            <dependency>
	        	<groupId>com.cisco.oss.foundation.queue</groupId>
	        	<artifactId>queue-api</artifactId>
	        </dependency>

For RabbitMQ add:

             <dependency>
            	<groupId>com.cisco.oss.foundation.queue</groupId>
				<artifactId>queue-rabbitmq</artifactId>
            </dependency>       

For HornetQ 2.2 add:

           <dependency>
	            <groupId>org.hornetq</groupId>
	            <artifactId>hornetq-core-client</artifactId>
	        </dependency>
	        <dependency>
	            <groupId>org.hornetq</groupId>
	            <artifactId>hornetq-jms-client</artifactId>
	        </dependency>	        
            <dependency>
                <groupId>com.cisco.vss.foundation.queue</groupId>
                <artifactId>queue-hornetq-jms</artifactId>
            </dependency>
            <dependency>
	        	<groupId>org.hornetq</groupId>
	        	<artifactId>hornetq-core</artifactId>				
	        </dependency>

For HornetQ 2.4 add:

		<dependency>
			<groupId>com.cisco.oss.foundation.queue</groupId>
			<artifactId>queue-hornetq</artifactId>
		</dependency>

Configuration

Consumer Configuration

Name Description Default Value
<consumer-name>.queue.name Optional queue name. when not set, by default the component name and instance id will be used
<consumer-name>.queue.filter Optional filter string - For JMS Applications this is the JMS Selector string. For AMQP you can use this to set the routing key. N/A
<consumer-name>.queue.isSubscription set to true if this consumer is a 'Topic Consumer' false
<consumer-name>.queue.filters Optional filters array - This supported in AMQP only for cases you want multiple routing keys. Usage: <consumer-name>.queue.filters.1=RK1 <consumer-name>.queue.filters.1=RK1 etc. N/A
<consumer-name>.queue.subscribedTo if '' set to true specify here the address used by the producer when publishing a message N/A
<consumer-name>.queue.isDurable sets the consumer to be durable. i.e. survive restarts. true

Producer Configuration

Name Description Default Value
<producer-name>.queue.name Mandatory Queue/Address name. N/A
<producer-name>.queue.isDurable sets the producer to be durable. i.e. survive restarts. true
<producer-name>.queue.isPersistent sets the exchange created by this producer to be be persistent. true
<producer-name>.queue.expiration sets the expiration of messages create by this producer. Value in milliseconds. 1800000

Consumer configSchema.xml Configuration

	<Parameter name="{consumerName}" type="STRUCTURE" description="consumer" base="queue.consumer.base">
		<DefaultValue>
			<StructureValue>
				<StructureMemberValue name="queue.filter" value="{pointer}" />
				<StructureMemberValue name="queue.name" value="{consumerName}" />
				<StructureMemberValue name="queue.isSubscription" value="true" />
				<StructureMemberValue name="queue.subscribedTo" value="{topic}" />
			</StructureValue>
		</DefaultValue>
	</Parameter>

When implementing support for both HornetQ and RabbitMQ, you can also add:

	<Parameter name="{component_name}.messageQType" type="STRING" description="Message queue type; HornetQ or RabbitMQ. default=RabbitMQ">
		<DefaultValue>
			<PrimitiveValue value="RabbitMQ"/>
		</DefaultValue>
		<Range>                
			<StringEnum value="RabbitMQ"/>
			<StringEnum value="HornetQ"/>
		</Range> 
	</Parameter>

Configuration Example

#Client connections to HornetQ/RabbitMQ Server - you can define as many entries as needed. Note: Currently only a single RabbitMQ node is supported (or cluster using DNS RoundRobin)

#RabbitMQ
service.rabbitmq.connections.1.host=10.45.37.122
service.rabbitmq.connections.1.port=5445
service.rabbitmq.connections.2.host=10.45.37.123
service.rabbitmq.connections.2.port=5445

#HornetQ
service.hornetq.connections.1.host=10.45.37.122
service.hornetq.connections.1.port=5445
service.hornetq.connections.2.host=10.45.37.123
service.hornetq.connections.2.port=5445

#consumer properties
#-------------------

#consumer1 - this is the consumer name that was used in code to initialize a new consumer
consumer1.queue.name=consumer1Queue

#an optional filter - only messages with proeprties aligned with this filter will be received by this consumer
consumer1.queue.filter=key1='value2'

#set to true if this consumer is a part of a 1:N consumers (like a topic subscriber)
consumer1.queue.isSubscription=true

#if isSubscription is set to true, specify here the publisher address name
consumer1.queue.subscribedTo=myExample

#producer properties
#-------------------

#example is the producer name as supplied in code when initializing a new producer
example.queue.name=myExample