Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
add the dependency below to your project
<dependency>
<groupId>com.xjbg</groupId>
<artifactId>rocketmq-sdk</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
you can choose rocketmq or aliyun ons,simple examples:
private MqProducerProperties mqProducerProperties() {
MqProducerProperties properties = new MqProducerProperties();
properties.setProducerGroup(MqConstants.DEFAULT_PRODUCER_GROUP);
properties.setNamesrvAddr("localhost:9876");
return properties;
}
private OnsProducerProperties onsProducerProperties() {
OnsProducerProperties properties = new OnsProducerProperties();
properties.setProducerGroup(MqConstants.DEFAULT_PRODUCER_GROUP);
properties.setOnsProperties(onsProperties());
return properties;
}
private OnsProperties onsProperties() {
OnsProperties onsProperties = new OnsProperties();
onsProperties.setAccessKey("accessKey");
onsProperties.setSecretKey("secretKey");
onsProperties.setOnsAddr("http://onsaddr-internet.aliyun.com:8080/rocketmq/nsaddr4client-internal");
return onsProperties;
}
private MqConsumerProperties mqConsumerProperties() {
MqConsumerProperties properties = new MqConsumerProperties();
properties.setNamesrvAddr("localhost:9876");
properties.setConsumerGroup(MqConstants.DEFAULT_CONSUMER_GROUP);
return properties;
}
private OnsConsumerProperties onsConsumerProperties() {
OnsConsumerProperties properties = new OnsConsumerProperties();
properties.setConsumerGroup(MqConstants.DEFAULT_CONSUMER_GROUP);
properties.setOnsProperties(onsProperties());
return properties;
}
@Test
public void testMqProducer() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = MqFactory.createProducer(mqProducerProperties());
producer.start();
// if (autoCreateTopic) {//for test or demo
// producer.createTopic(producer.getCreateTopicKey(), testTopic, 4);
// }
producer.send(MessageBuilder.newBuilder().topic(testTopic).body("1").getRocketMQMessage(), 15000);
producer.shutdown();
}
@Test
public void testMqConsumer() throws MQClientException, InterruptedException {
DefaultMQPushConsumer pushConsumer = MqFactory.createPushConsumer(mqConsumerProperties(), (MessageListenerConcurrently) (msgs, context) -> {
msgs.forEach(x -> {
System.out.println(new String(x.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
pushConsumer.subscribe(testTopic, "*");
pushConsumer.start();
Thread.sleep(20_000);
pushConsumer.shutdown();
}
/**
* I'm so poor that I could not buy a ons for test
* so you should test it yourself and tell me if there are some bugs exist
*/
@Test
public void testManagerApi() {
OnsProperties onsProperties = onsProperties();
IAcsClient iAcsClient = OnsManagerApi.acsClient(onsProperties);
IClientProfile profile = OnsManagerApi.getProfile(onsProperties);
String onsRegionId = OnsManagerApi.getONSRegionId(onsProperties);
OnsManagerApi.checkConsumer(iAcsClient, "", "");
}
@Test
public void testOnsProducer() {
Producer producer = ONSFactory.createProducer(onsProducerProperties().properties());
producer.start();
producer.send(MessageBuilder.newBuilder().topic(testTopic).body("1").getOnsMessage());
producer.shutdown();
}
@Test
public void testOnsConsumer() throws InterruptedException {
Consumer consumer = ONSFactory.createConsumer(onsConsumerProperties().properties());
consumer.subscribe(testTopic, "*", (message, context) -> {
System.out.println(new String(message.getBody()));
return Action.CommitMessage;
});
consumer.start();
Thread.sleep(20_000);
consumer.shutdown();
}
if you are using springboot then you can make it easier.
<dependency>
<groupId>com.xjbg</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
rocketmq:
profile: local
product: rocketmq
producer:
namesrvAddr: localhost:9876
consumer:
namesrvAddr: localhost:9876
ons:
accessKey: accessKey
secretKey: secretKey
onsAddr: http://onsaddr-internet.aliyun.com:8080/rocketmq/nsaddr4client-internal
spring:
profiles:
active: local
use the annotation @Consumer to annotate a method of a spring bean,then a consumer will be created automatically and the message received will be processed by this method
private static final String testTopic = "message-center-topic";
@Autowired(required = false)
private DefaultMQProducer mqProducer;
@Autowired(required = false)
private ProducerBean producerBean;
@Consumer(consumerGroup = "test-group", topic = testTopic, consumerType = ConsumerType.PUSH)
public void test(String a) {
System.out.println(a);
@Test
public void run() throws Exception {
if (mqProducer != null) {
mqProducer.send(MessageBuilder.newProfileBuilder().topic(testTopic).body(1).getRocketMQMessage());
}
if (producerBean != null) {
producerBean.send(MessageBuilder.newProfileBuilder().topic(testTopic).body(1).getOnsMessage());
}
Thread.sleep(30_000);
}
- Mailing Lists: https://rocketmq.apache.org/about/contact/
- Home: https://rocketmq.apache.org
- Docs: https://rocketmq.apache.org/docs/quick-start/
- Issues: https://github.com/apache/rocketmq/issues
- Rips: https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal
- Ask: https://stackoverflow.com/questions/tagged/rocketmq
- Slack: https://rocketmq-invite-automation.herokuapp.com/
- Community: https://github.com/apache/rocketmq-externals