本节内容是关于RocketMQ消息中间键的实战内容,主要介绍在springboot项目中如何集成使用RocketMQ消息中间键,包括消息的发送、消息的接收以及RocketMQ的一些配置说明,以及效果说明。话不多说,开始实战内容。
参考博客:Docker环境下使用docker-compose一键式搭建RocketMQ(4.5.0版本)集群及其管理工具(外网版)_北溟溟的博客-CSDN博客
引入RocketMQ的pom依赖
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.2</version>
- </dependency>
- #rocketmq配置
- rocketmq:
- name-server: 192.168.56.10:9876;192.168.56.10:9877
- producer:
- #生产者组名称
- group: atp-producer
- #命名空间
- namespace: atp
- #异步消息发送失败重试次数,默认是2
- retry-times-when-send-async-failed: 2
- #发送消息超时时间,默认2000ms
- send-message-timeout: 2000
- #消息的最大长度:默认1024 * 1024 * 4(默认4M)
- max-message-size: 40000000
- #压缩消息阈值,超过4k就压缩
- compress-message-body-threshold: 4096
- #是否发送失败,重试另外的broker
- retry-next-server: false
- #是否启用消息追踪
- enable-msg-trace: false
- #默认追踪的主题
- customized-trace-topic: RMQ_SYS_TRACE_TOPIC
- #消息发送失败重试的次数
- retry-times-when-send-failed: 2
①创建RocketMQ消息的生产者RocketProducer.java
package com.yundi.atp.platform.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component @Slf4j public class RocketProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送同步消息:消息响应后发送下一条消息 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendSyncMsg(String topic, String tag, String key, String data) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSend(destination, message); log.info("【RocketMQ】发送同步消息:{}", sendResult); } /** * 发送异步消息:异步回调通知消息发送的状况 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendAsyncMsg(String topic, String tag, String key, String data) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSend(destination, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("【RocketMQ】发送异步消息:{}", sendResult); } @Override public void onException(Throwable e) { log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage()); } }); } /** * 发送单向消息:消息发送后无响应,可靠性差,效率高 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendOneWayMsg(String topic, String tag, String key, String data) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.sendOneWay(destination, message); } /** * 同步延迟消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 * @param timeout 发送消息的过期时间 * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSend(destination, message, timeout, delayLevel); log.info("【RocketMQ】发送同步延迟消息:{}", sendResult); } /** * 异步延迟消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 * @param timeout 发送消息的过期时间 * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSend(destination, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("【RocketMQ】发送异步延迟消息:{}", sendResult); } @Override public void onException(Throwable e) { log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage()); } }, timeout, delayLevel); } /** * 同步顺序消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, key); log.info("【RocketMQ】发送同步顺序消息:{}", sendResult); } /** * 异步顺序消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) { //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSendOrderly(destination, message, key, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("【RocketMQ】发送异步顺序消息:{}", sendResult); } @Override public void onException(Throwable e) { log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage()); } }); } }②创建消息发送的web接口
package com.yundi.atp.platform.module.test.controller; import com.yundi.atp.platform.common.Result; import com.yundi.atp.platform.rocketmq.RocketConstant; import com.yundi.atp.platform.rocketmq.RocketProducer; import io.swagger.annotations.Api; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @Api(tags = {"Springboot集成RocketMQ测试"}) @RestController @RequestMapping(value = "/rocket") public class RocketContoller { @Autowired private RocketProducer rocketProducer; @GetMapping(value = "/sendRocketTestMsg/{topic}/{msg}") public Result sendKafkaTestMsg(@PathVariable(value = "topic") String topic, @PathVariable(value = "msg") String msg) { rocketProducer.sendSyncMsg(topic, RocketConstant.ROCKET_TAG, UUID.randomUUID().toString(), msg); return Result.success(); } }
③创建前端消息发送界面
"container"> "title"> Springboot集成RocketMQ案例"vertical"> "home"> "font-size: 18px;">退出Test Staring "width: 400px;background: #ddd;padding: 40px 20px;">"form" :model="form" label-width="70px" class="login"> "发送主题" prop="topic"> "form.topic"> "发送消息" prop="msg"> "form.msg"> "primary" @click="sengMsg" style="width: 100%;margin: 0;">立即发送 export default { name: "RocketMQ", data() { return { form: { topic: 'atp', msg: 'hello world!', } } }, methods: { sengMsg() { const _this = this; _this.$http.get('/rocket/sendRocketTestMsg/'+this.form.topic+"/"+this.form.msg,).then(res => { if (res.data.code === 1) { _this.$refs['form'].resetFields(); _this.$message.success(res.data.msg); } else { _this.$refs['form'].resetFields(); _this.$message.warning(res.data.msg); } }).catch(error => { _this.$message.error(error); }); } } }
①创建消费者RocketConsumer.java
package com.yundi.atp.platform.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * @Description: rocket消费者 * * consumerGroup:消费者组 * topic:消费主题 * selectorType: 选则器模式,默认SelectorType.TAG * selectorExpression:消费模式的值 * consumeMode:消费者模式 CONCURRENTLY(默认消费模式) ORDERLY(顺序消费) * messageModel:消息模式 CLUSTERING(集群模式) BROADCASTING(广播模式) * consumeThreadNumber:消费者线程数 * maxReconsumeTimes:最大重复消费次数 * consumeTimeout:消费过期时间 * replyTimeout:重试时间 * enableMsgTrace:是否允许消息追踪 * customizedTraceTopic:自定义的消息追踪主题 * nameServer:注册服务器 * namespace:名称空间 * * @Date: 2021/11/3 18:31 * @Version: 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = RocketConstant.ROCKET_CONSUMER_GROUP, topic = RocketConstant.ROCKET_TOPIC, selectorExpression = RocketConstant.ROCKET_TAG, namespace = RocketConstant.ROCKET_NAMESPACE) public class RocketConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("message-------------------------------------->:{}", message); } }②消息常量定义RocketConstant.java
package com.yundi.atp.platform.rocketmq; /** * @Description: rocketmq常量定义 * @Date: 2022/10/20 14:24 * @Version: 1.0.0 */ public class RocketConstant { /** * 消费者组 */ public final static String ROCKET_CONSUMER_GROUP = "atp-consumer"; /** * 主题 */ public final static String ROCKET_TOPIC = "atp"; /** * tag */ public final static String ROCKET_TAG = "app"; /** * 名称空间 */ public final static String ROCKET_NAMESPACE = "atp"; }
①启动RocketMQ集群,保证RocketMQ是启动状态
②启动后端与前端服务
③发送消息测试,控制台有消息,后端也可以消费到消息
至此,关于springboot集成RocketMQ案例实战,我们下期见。。。