RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。
- <dependency>
- <groupId>org.apache.rocketmqgroupId>
- <artifactId>rocketmq-spring-boot-starterartifactId>
- <version>2.0.4version>
- dependency>
application.yml 文件中添加如下配置:
- rocketmq:
- name-server: 192.168.152.165:9876
- producer:
- group: my-group
- package com.rocketmq.springbootrocketmq;
-
-
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.test.context.junit4.SpringRunner;
-
- import java.util.concurrent.TimeUnit;
-
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class T {
-
-
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- //同步消息
- @Test
- public void testRocketMQ() {
- Message msg = MessageBuilder.withPayload("boot发送同步消息").build();
- rocketMQTemplate.send("helloTopicBoot", msg);
- System.out.println("success send");
- }
-
- //异步消息
- @Test
- public void sendASYCMsg() throws InterruptedException {
- Message message = MessageBuilder.withPayload("boot发送异步消息").build();
- rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("发送状态:"+sendResult.getSendStatus());
- }
-
- @Override
- public void onException(Throwable throwable) {
- System.out.println("消息发送失败");
- }
- });
- TimeUnit.SECONDS.sleep(5);
- }
-
- //一次性消息
- @Test
- public void sendOneWayRocketMQ() {
- Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();
- rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
- }
-
- }
消费者:
- package com.example.springbooTRocketMQConsumer.listener;
-
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.Charset;
-
- @Component
- @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
- public class HelloTopicListener implements RocketMQListener
{ -
- @Override
- public void onMessage(MessageExt messageExt) {
- System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
- }
- }
消费者:messageModel = MessageModel.BROADCASTING
- package com.example.springbooTRocketMQConsumer.listener;
-
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.Charset;
-
- @Component
- @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING)
- public class HelloTopicListener implements RocketMQListener
{ -
- @Override
- public void onMessage(MessageExt messageExt) {
- System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
- }
- }
- //顺序消息
- @Test
- public void sendOrderlyMsg(){
- //设置队列选择器
- rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
- @Override
- public MessageQueue select(List
list, org.apache.rocketmq.common.message.Message message, Object o) { - String orderIdStr = (String) o;
- long orderId = Long.parseLong(orderIdStr);
- int index = (int)orderId % list.size();
- return list.get(index);
- }
- });
-
- List
orderSteps = OrderUtil.buildOrders(); - for (OrderStep orderStep : orderSteps) {
- Message msg = MessageBuilder.withPayload(orderStep.toString()).build();
- rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId()));
-
- }
- }
- package com.example.springbooTRocketMQConsumer.listener;
-
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.ConsumeMode;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.Charset;
-
- @Component
- @RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
- public class OrderlyTopicListener implements RocketMQListener
{ -
- @Override
- public void onMessage(MessageExt messageExt) {
- System.out.println("当前线程:" + Thread.currentThread() + "队列ID"+messageExt.getQueueId() + ",消息内容:" + new String(messageExt.getBody(),Charset.defaultCharset()));
- }
- }
- //延迟消息
- @Test
- public void sendDelayRocketMQ() {
- Message msg = MessageBuilder.withPayload("boot发送延时消息,发送时间:"+new Date()).build();
- rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3);
- }
-
- package com.example.springbooTRocketMQConsumer.listener;
-
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.Charset;
- import java.util.Date;
-
- @Component
- @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
- public class DelayTopicListener implements RocketMQListener
{ -
- @Override
- public void onMessage(MessageExt messageExt) {
- System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
- }
- }
- //Tag消息
- @Test
- public void sendTagFilterRocketMQ() {
- Message msg1 = MessageBuilder.withPayload("消息A").build();
- rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);
- Message msg2 = MessageBuilder.withPayload("消息B").build();
- rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);
- Message msg3 = MessageBuilder.withPayload("消息C").build();
- rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);
- }
- package com.example.springbooTRocketMQConsumer.listener;
-
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.Charset;
- import java.util.Date;
-
- @Component
- @RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
- public class TagFilterTopicListener implements RocketMQListener
{ -
- @Override
- public void onMessage(MessageExt messageExt) {
- System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
- }
- }
- //SQL92消息
- @Test
- public void sendSQL92FilterRocketMQ() {
- Message msg1 = MessageBuilder.withPayload("小红,年龄22,体重45").setHeader("age","22").setHeader("weight",45).build();
- rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);
- Message msg2 = MessageBuilder.withPayload("小明,年龄25,体重60").setHeader("age","25").setHeader("weight",60).build();
- rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);
- Message msg3 = MessageBuilder.withPayload("小蓝,年龄40,体重70").setHeader("age","40").setHeader("weight",70).build();
- rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);
- }
- package com.example.springbooTRocketMQConsumer.listener;
-
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.annotation.SelectorType;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.Charset;
- import java.util.Date;
-
- @Component
- @RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60")
- public class SQL92FilterTopicListener implements RocketMQListener
{ -
- @Override
- public void onMessage(MessageExt messageExt) {
- System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
- }
- }