• (十九)ATP应用测试平台——springboot集成RocketMQ案例实战


    前言

    本节内容是关于RocketMQ消息中间键的实战内容,主要介绍在springboot项目中如何集成使用RocketMQ消息中间键,包括消息的发送、消息的接收以及RocketMQ的一些配置说明,以及效果说明。话不多说,开始实战内容。

    正文

    • 搭建RocketMQ集群

    参考博客:Docker环境下使用docker-compose一键式搭建RocketMQ(4.5.0版本)集群及其管理工具(外网版)_北溟溟的博客-CSDN博客

    • 引入RocketMQ的pom依赖

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-spring-boot-starter</artifactId>
    4. <version>2.2.2</version>
    5. </dependency>

    •  在配置文件application.yml引入RocketMQ配置 
    1. #rocketmq配置
    2. rocketmq:
    3. name-server: 192.168.56.10:9876;192.168.56.10:9877
    4. producer:
    5. #生产者组名称
    6. group: atp-producer
    7. #命名空间
    8. namespace: atp
    9. #异步消息发送失败重试次数,默认是2
    10. retry-times-when-send-async-failed: 2
    11. #发送消息超时时间,默认2000ms
    12. send-message-timeout: 2000
    13. #消息的最大长度:默认1024 * 1024 * 4(默认4M)
    14. max-message-size: 40000000
    15. #压缩消息阈值,超过4k就压缩
    16. compress-message-body-threshold: 4096
    17. #是否发送失败,重试另外的broker
    18. retry-next-server: false
    19. #是否启用消息追踪
    20. enable-msg-trace: false
    21. #默认追踪的主题
    22. customized-trace-topic: RMQ_SYS_TRACE_TOPIC
    23. #消息发送失败重试的次数
    24. retry-times-when-send-failed: 2

    •  创建消息的生产者

    ①创建RocketMQ消息的生产者RocketProducer.java

    1. package com.yundi.atp.platform.rocketmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.rocketmq.client.producer.*;
    4. import org.apache.rocketmq.spring.core.RocketMQTemplate;
    5. import org.apache.rocketmq.spring.support.RocketMQHeaders;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.messaging.Message;
    8. import org.springframework.messaging.support.MessageBuilder;
    9. import org.springframework.stereotype.Component;
    10. @Component
    11. @Slf4j
    12. public class RocketProducer {
    13. @Autowired
    14. private RocketMQTemplate rocketMQTemplate;
    15. /**
    16. * 发送同步消息:消息响应后发送下一条消息
    17. *
    18. * @param topic 消息主题
    19. * @param tag 消息tag
    20. * @param key 业务号
    21. * @param data 消息内容
    22. */
    23. public void sendSyncMsg(String topic, String tag, String key, String data) {
    24. //消息
    25. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    26. //主题
    27. String destination = topic + ":" + tag;
    28. SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
    29. log.info("【RocketMQ】发送同步消息:{}", sendResult);
    30. }
    31. /**
    32. * 发送异步消息:异步回调通知消息发送的状况
    33. *
    34. * @param topic 消息主题
    35. * @param tag 消息tag
    36. * @param key 业务号
    37. * @param data 消息内容
    38. */
    39. public void sendAsyncMsg(String topic, String tag, String key, String data) {
    40. //消息
    41. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    42. //主题
    43. String destination = topic + ":" + tag;
    44. rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
    45. @Override
    46. public void onSuccess(SendResult sendResult) {
    47. log.info("【RocketMQ】发送异步消息:{}", sendResult);
    48. }
    49. @Override
    50. public void onException(Throwable e) {
    51. log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());
    52. }
    53. });
    54. }
    55. /**
    56. * 发送单向消息:消息发送后无响应,可靠性差,效率高
    57. *
    58. * @param topic 消息主题
    59. * @param tag 消息tag
    60. * @param key 业务号
    61. * @param data 消息内容
    62. */
    63. public void sendOneWayMsg(String topic, String tag, String key, String data) {
    64. //消息
    65. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    66. //主题
    67. String destination = topic + ":" + tag;
    68. rocketMQTemplate.sendOneWay(destination, message);
    69. }
    70. /**
    71. * 同步延迟消息
    72. *
    73. * @param topic 主题
    74. * @param tag 标签
    75. * @param key 业务号
    76. * @param data 消息体
    77. * @param timeout 发送消息的过期时间
    78. * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    79. */
    80. public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
    81. //消息
    82. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    83. //主题
    84. String destination = topic + ":" + tag;
    85. SendResult sendResult = rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
    86. log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);
    87. }
    88. /**
    89. * 异步延迟消息
    90. *
    91. * @param topic 主题
    92. * @param tag 标签
    93. * @param key 业务号
    94. * @param data 消息体
    95. * @param timeout 发送消息的过期时间
    96. * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    97. */
    98. public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
    99. //消息
    100. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    101. //主题
    102. String destination = topic + ":" + tag;
    103. rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
    104. @Override
    105. public void onSuccess(SendResult sendResult) {
    106. log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);
    107. }
    108. @Override
    109. public void onException(Throwable e) {
    110. log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());
    111. }
    112. }, timeout, delayLevel);
    113. }
    114. /**
    115. * 同步顺序消息
    116. *
    117. * @param topic 主题
    118. * @param tag 标签
    119. * @param key 业务号
    120. * @param data 消息体
    121. */
    122. public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {
    123. //消息
    124. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    125. //主题
    126. String destination = topic + ":" + tag;
    127. SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, key);
    128. log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);
    129. }
    130. /**
    131. * 异步顺序消息
    132. *
    133. * @param topic 主题
    134. * @param tag 标签
    135. * @param key 业务号
    136. * @param data 消息体
    137. */
    138. public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {
    139. //消息
    140. Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
    141. //主题
    142. String destination = topic + ":" + tag;
    143. rocketMQTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {
    144. @Override
    145. public void onSuccess(SendResult sendResult) {
    146. log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);
    147. }
    148. @Override
    149. public void onException(Throwable e) {
    150. log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());
    151. }
    152. });
    153. }
    154. }

    ②创建消息发送的web接口

    1. package com.yundi.atp.platform.module.test.controller;
    2. import com.yundi.atp.platform.common.Result;
    3. import com.yundi.atp.platform.rocketmq.RocketConstant;
    4. import com.yundi.atp.platform.rocketmq.RocketProducer;
    5. import io.swagger.annotations.Api;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.web.bind.annotation.GetMapping;
    8. import org.springframework.web.bind.annotation.PathVariable;
    9. import org.springframework.web.bind.annotation.RequestMapping;
    10. import org.springframework.web.bind.annotation.RestController;
    11. import java.util.UUID;
    12. @Api(tags = {"Springboot集成RocketMQ测试"})
    13. @RestController
    14. @RequestMapping(value = "/rocket")
    15. public class RocketContoller {
    16. @Autowired
    17. private RocketProducer rocketProducer;
    18. @GetMapping(value = "/sendRocketTestMsg/{topic}/{msg}")
    19. public Result sendKafkaTestMsg(@PathVariable(value = "topic") String topic, @PathVariable(value = "msg") String msg) {
    20. rocketProducer.sendSyncMsg(topic, RocketConstant.ROCKET_TAG, UUID.randomUUID().toString(), msg);
    21. return Result.success();
    22. }
    23. }

    ③创建前端消息发送界面

    • 创建消息的消费者

    ①创建消费者RocketConsumer.java

    1. package com.yundi.atp.platform.rocketmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.core.RocketMQListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @Description: rocket消费者
    8. *
    9. * consumerGroup:消费者组
    10. * topic:消费主题
    11. * selectorType: 选则器模式,默认SelectorType.TAG
    12. * selectorExpression:消费模式的值
    13. * consumeMode:消费者模式 CONCURRENTLY(默认消费模式) ORDERLY(顺序消费)
    14. * messageModel:消息模式 CLUSTERING(集群模式) BROADCASTING(广播模式)
    15. * consumeThreadNumber:消费者线程数
    16. * maxReconsumeTimes:最大重复消费次数
    17. * consumeTimeout:消费过期时间
    18. * replyTimeout:重试时间
    19. * enableMsgTrace:是否允许消息追踪
    20. * customizedTraceTopic:自定义的消息追踪主题
    21. * nameServer:注册服务器
    22. * namespace:名称空间
    23. *
    24. * @Date: 2021/11/3 18:31
    25. * @Version: 1.0.0
    26. */
    27. @Slf4j
    28. @Component
    29. @RocketMQMessageListener(consumerGroup = RocketConstant.ROCKET_CONSUMER_GROUP,
    30. topic = RocketConstant.ROCKET_TOPIC,
    31. selectorExpression = RocketConstant.ROCKET_TAG,
    32. namespace = RocketConstant.ROCKET_NAMESPACE)
    33. public class RocketConsumer implements RocketMQListener {
    34. @Override
    35. public void onMessage(String message) {
    36. log.info("message-------------------------------------->:{}", message);
    37. }
    38. }

    ②消息常量定义RocketConstant.java

    1. package com.yundi.atp.platform.rocketmq;
    2. /**
    3. * @Description: rocketmq常量定义
    4. * @Date: 2022/10/20 14:24
    5. * @Version: 1.0.0
    6. */
    7. public class RocketConstant {
    8. /**
    9. * 消费者组
    10. */
    11. public final static String ROCKET_CONSUMER_GROUP = "atp-consumer";
    12. /**
    13. * 主题
    14. */
    15. public final static String ROCKET_TOPIC = "atp";
    16. /**
    17. * tag
    18. */
    19. public final static String ROCKET_TAG = "app";
    20. /**
    21. * 名称空间
    22. */
    23. public final static String ROCKET_NAMESPACE = "atp";
    24. }

    •  验证结果

    ①启动RocketMQ集群,保证RocketMQ是启动状态

    ②启动后端与前端服务

    ③发送消息测试,控制台有消息,后端也可以消费到消息

     

    结语

    至此,关于springboot集成RocketMQ案例实战,我们下期见。。。

  • 相关阅读:
    STM32_3(GPIO)
    c++面试八股文
    Android音频框架之OpenSL ES、AAudio、Oboe介绍(二)
    DC电源模块对效率有什么要求?
    【云原生】kubernetes中pod的生命周期、探测钩子的实战应用案例解析
    代码管理工具-Git基础介绍及常用技巧
    构建强大的产品级NLP系统:PaddleNLP Pipelines端到端流水线框架解析
    Linux——进程控制之替换
    二次规划的基础知识理论(更新中)
    C#练习题7和8
  • 原文地址:https://blog.csdn.net/yprufeng/article/details/127432380