• java springBoot实现RabbitMq消息队列 生产者,消费者


    1.RabbitMq的数据源配置文件

    1. # 数据源配置
    2. spring:
    3. rabbitmq:
    4. host: 127.0.0.1
    5. port: 5672
    6. username: root
    7. password: root
    8. #消息发送和接收确认
    9. publisher-confirms: true
    10. publisher-returns: true
    11. listener:
    12. direct:
    13. acknowledge-mode: manual
    14. simple:
    15. acknowledge-mode: manual
    16. retry:
    17. enabled: true #是否开启消费者重试
    18. max-attempts: 5 #最大重试次数
    19. initial-interval: 2000 #重试间隔时间(单位毫秒)

    2.maven依赖

    
    
        org.springframework.boot
        spring-boot-starter-amqp
    

    3.RabbitMq文件目录预览

    4. RabbitMq的Action文件

    1. package com.zq.cnz.mq.constant;
    2. public enum Action {
    3. ACCEPT, // 处理成功
    4. RETRY, // 可以重试的错误
    5. REJECT, // 无需重试的错误
    6. }

    5.RabbitMq的QueueContent文件

    1. package com.zq.cnz.mq.constant;
    2. /**
    3. * @ClassName: QueueContent
    4. * @Description: 消息队列名称
    5. * @author 吴顺杰
    6. * @date 2023年11月15日
    7. *
    8. */
    9. public class QueueContent {
    10. /**
    11. * 测试消息队列
    12. */
    13. public static final String TEST_MQ_QUEUE = "test_mq_queue";
    14. /**
    15. * 测试消息队列交换机
    16. */
    17. public static final String TEST_MQ_QUEUE_EXCHANGE = "test_mq_queue_exchange";
    18. /**
    19. * 测试消息延迟消费队列
    20. */
    21. public static final String TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE = "test_mq_queue_time_delay_exchange";
    22. }

    6.消息队列生产者MessageProvider方法

    1. package com.zq.cnz.mq;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.zq.common.utils.IdUtils;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. import org.springframework.amqp.AmqpException;
    7. import org.springframework.amqp.core.Message;
    8. import org.springframework.amqp.core.MessagePostProcessor;
    9. import org.springframework.amqp.rabbit.connection.CorrelationData;
    10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    11. import org.springframework.stereotype.Component;
    12. /**
    13. * 消息队列生产
    14. */
    15. @Component
    16. public class MessageProvider implements RabbitTemplate.ConfirmCallback {
    17. static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    18. /**
    19. * RabbitMQ 模版消息实现类
    20. */
    21. protected RabbitTemplate rabbitTemplate;
    22. public MessageProvider(RabbitTemplate rabbitTemplate) {
    23. this.rabbitTemplate = rabbitTemplate;
    24. this.rabbitTemplate.setMandatory(true);
    25. this.rabbitTemplate.setConfirmCallback(this);
    26. }
    27. private String msgPojoStr;
    28. /**
    29. * 推送消息至消息队列
    30. *
    31. * @param msg
    32. * @param queueName
    33. */
    34. public void sendMqMessage(String queueName,String msg) {
    35. try {
    36. JSONObject object = JSONObject.parseObject(msg);
    37. String msgId = IdUtils.fastUUID().toString();
    38. object.put("msgId", msgId);
    39. msg = object.toString();
    40. msgPojoStr = msg;
    41. logger.info("推送消息至" + queueName + "消息队列,消息内容" + msg);
    42. rabbitTemplate.convertAndSend(queueName, msg);
    43. } catch (AmqpException e) {
    44. e.printStackTrace();
    45. logger.error("推送消息至消息队列异常 ,msg=" + msg + ",queueName=" + queueName, e);
    46. }
    47. }
    48. /**
    49. * 推送广播消息
    50. *
    51. * @param exchangeName
    52. * @param msg
    53. */
    54. public void sendFanoutMsg(String exchangeName, String msg) {
    55. try {
    56. JSONObject object = JSONObject.parseObject(msg);
    57. String msgId = IdUtils.fastUUID().toString();
    58. object.put("msgId", msgId);
    59. msg = object.toString();
    60. msgPojoStr = msg;
    61. logger.info("推送广播消息至交换机" + exchangeName + ",消息内容" + msg);
    62. rabbitTemplate.convertAndSend(exchangeName, "", msg);
    63. } catch (AmqpException e) {
    64. e.printStackTrace();
    65. logger.error("推送广播至交换机异常 ,msg=" + msg + ",exchangeName=" + exchangeName, e);
    66. }
    67. }
    68. /**
    69. * 发送延时消息
    70. *
    71. * @param queueName
    72. * @param msg
    73. */
    74. public void sendTimeDelayMsg(String queueName, String exchangeName, String msg, Integer time) {
    75. try {
    76. JSONObject object = JSONObject.parseObject(msg);
    77. String msgId = IdUtils.fastUUID().toString();
    78. object.put("msgId", msgId);
    79. msg = object.toString();
    80. msgPojoStr = msg;
    81. logger.info("推送延时消息至" + exchangeName + "," + queueName + "消息队列,消息内容" + msg + ",延时时间" + time + "秒");
    82. rabbitTemplate.convertAndSend(exchangeName, queueName, msg, new MessagePostProcessor() {
    83. @Override
    84. public Message postProcessMessage(Message message) throws AmqpException {
    85. message.getMessageProperties().setHeader("x-delay", time * 1000);
    86. return message;
    87. }
    88. });
    89. } catch (AmqpException e) {
    90. e.printStackTrace();
    91. logger.error("推送消息至消息队列异常 ,msg=" + msg + ",exchangeName=" + exchangeName + ",queueName=" + queueName
    92. + ",time=" + time, e);
    93. }
    94. }
    95. @Override
    96. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    97. if (ack) {
    98. logger.info(msgPojoStr + ":消息发送成功");
    99. } else {
    100. logger.warn(msgPojoStr + ":消息发送失败:" + cause);
    101. }
    102. }
    103. }

    7.消息队列消费者RabbitMqConfiguration文件配置

    1. package com.zq.cnz.mq;
    2. import com.zq.cnz.mq.constant.QueueContent;
    3. import org.springframework.amqp.core.*;
    4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    5. import org.springframework.amqp.rabbit.core.RabbitAdmin;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. import javax.annotation.Resource;
    10. import java.util.HashMap;
    11. import java.util.Map;
    12. @Configuration
    13. public class RabbitMqConfiguration {
    14. @Resource
    15. RabbitAdmin rabbitAdmin;
    16. // 创建初始化RabbitAdmin对象
    17. @Bean
    18. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    19. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    20. // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
    21. rabbitAdmin.setAutoStartup(true);
    22. return rabbitAdmin;
    23. }
    24. /**
    25. * 测试消息队列
    26. *
    27. * @return
    28. */
    29. @Bean
    30. public Queue TEST_QUEUE() {
    31. return new Queue(QueueContent.TEST_MQ_QUEUE);
    32. }
    33. /**
    34. * 测试交换机
    35. *
    36. * @return
    37. */
    38. @Bean
    39. FanoutExchange TEST_MQ_QUEUE_EXCHANGE() {
    40. return new FanoutExchange(QueueContent.TEST_MQ_QUEUE_EXCHANGE);
    41. }
    42. /**
    43. * 测试延迟消费交换机
    44. *
    45. * @return
    46. */
    47. @Bean
    48. public CustomExchange TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE() {
    49. Map<String, Object> args = new HashMap<>();
    50. args.put("x-delayed-type", "direct");
    51. return new CustomExchange(QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    52. }
    53. /**
    54. * 测试延迟消费交换机绑定延迟消费队列
    55. *
    56. * @return
    57. */
    58. @Bean
    59. public Binding banTestQueue() {
    60. return BindingBuilder.bind(TEST_QUEUE()).to(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE()).with(QueueContent.TEST_MQ_QUEUE).noargs();
    61. }
    62. // 创建交换机和对列,跟上面的Bean的定义保持一致
    63. @Bean
    64. public void createExchangeQueue() {
    65. //测试消费队列
    66. rabbitAdmin.declareQueue(TEST_QUEUE());
    67. //测试消费交换机
    68. rabbitAdmin.declareExchange(TEST_MQ_QUEUE_EXCHANGE());
    69. //测试延迟消费交换机
    70. rabbitAdmin.declareExchange(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE());
    71. }
    72. }

    8.TestQueueConsumer 消息队列消费+延迟消费

    1. package com.zq.cnz.mq.MessageConsumer;
    2. import com.alibaba.druid.util.StringUtils;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.rabbitmq.client.Channel;
    5. import com.zq.cnz.mq.MessageProvider;
    6. import com.zq.cnz.mq.constant.Action;
    7. import com.zq.cnz.mq.constant.QueueContent;
    8. import com.zq.common.utils.IdUtils;
    9. import com.zq.common.utils.RedisUtils;
    10. import com.zq.common.utils.spring.SpringUtils;
    11. import org.slf4j.Logger;
    12. import org.slf4j.LoggerFactory;
    13. import org.springframework.amqp.core.Message;
    14. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    15. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    16. import org.springframework.beans.factory.annotation.Autowired;
    17. import org.springframework.stereotype.Component;
    18. import java.io.IOException;
    19. /**
    20. * 测试消息队列消费
    21. */
    22. @Component
    23. @RabbitListener(queues = QueueContent.TEST_MQ_QUEUE)
    24. public class TestQueueConsumer {
    25. @Autowired
    26. private RedisUtils redisUtils;
    27. static final Logger logger = LoggerFactory.getLogger(TestQueueConsumer.class);
    28. @RabbitHandler
    29. public void handler(String msg, Channel channel, Message message) throws IOException {
    30. if (!StringUtils.isEmpty(msg)) {
    31. JSONObject jsonMsg = JSONObject.parseObject(msg);
    32. // logger.info("TestQueueConsumer:"+jsonMsg.toJSONString());
    33. Action action = Action.RETRY;
    34. // 获取消息ID
    35. String msgId = jsonMsg.getString("msgId");
    36. // 消费次数+1
    37. redisUtils.incr("MQ_MSGID:" + msgId, 1);
    38. redisUtils.expire("MQ_MSGID:" + msgId, 60);
    39. try {
    40. logger.info("测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
    41. action = Action.ACCEPT;
    42. } catch (Exception e) {
    43. logger.error("MQ_MSGID:" + msgId + ",站控权限请求关闭接口异常,msg=" + msg, e);
    44. } finally {
    45. // 通过finally块来保证Ack/Nack会且只会执行一次
    46. if (action == Action.ACCEPT) {
    47. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    48. } else if (action == Action.RETRY) {
    49. // 判断当前消息消费次数,已经消费3次则放弃消费
    50. if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {
    51. logger.error("MQ_MSGID:" + msgId + ",异步处理超出失败次数限制,msg=" + msg);
    52. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    53. } else {
    54. // 回归队列重新消费
    55. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    56. }
    57. } else {
    58. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    59. }
    60. }
    61. }
    62. }
    63. }

    9.TestExchangeConsumer 交换机广播模式 

    1. package com.zq.cnz.mq.MessageConsumer;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONArray;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.alibaba.fastjson.serializer.SerializerFeature;
    6. import com.rabbitmq.client.Channel;
    7. import com.zq.cnz.mq.constant.Action;
    8. import com.zq.cnz.mq.constant.QueueContent;
    9. import com.zq.common.utils.IdUtils;
    10. import com.zq.common.utils.RedisUtils;
    11. import com.zq.common.utils.StringUtils;
    12. import com.zq.common.utils.spring.SpringUtils;
    13. import org.slf4j.Logger;
    14. import org.slf4j.LoggerFactory;
    15. import org.springframework.amqp.core.ExchangeTypes;
    16. import org.springframework.amqp.core.Message;
    17. import org.springframework.amqp.rabbit.annotation.*;
    18. import org.springframework.beans.factory.annotation.Autowired;
    19. import org.springframework.stereotype.Component;
    20. import javax.annotation.Resource;
    21. import java.io.IOException;
    22. import java.util.List;
    23. /**
    24. * 测试交换机消费
    25. */
    26. @Component
    27. @RabbitListener(bindings = @QueueBinding(value = @Queue(), exchange = @Exchange(value = QueueContent.TEST_MQ_QUEUE_EXCHANGE, type = ExchangeTypes.FANOUT)))
    28. public class TestExchangeConsumer {
    29. static final Logger logger = LoggerFactory.getLogger(TestExchangeConsumer.class);
    30. @Resource
    31. private RedisUtils redisUtils;
    32. @RabbitHandler
    33. public void handler(String msg, Channel channel, Message message) throws IOException {
    34. if (!StringUtils.isEmpty(msg)) {
    35. // logger.info("接收交换机生产者消息:{}", msg);
    36. Action action = Action.ACCEPT;
    37. // 请求参数
    38. JSONObject jsonMsg = JSONObject.parseObject(msg);
    39. // 获取消息ID
    40. String msgId = jsonMsg.getString("msgId");
    41. // 消费次数+1
    42. redisUtils.incr("MQ_MSGID:" + msgId, 1);
    43. redisUtils.expire("MQ_MSGID:" + msgId, 60);
    44. try {
    45. Integer CMD = jsonMsg.getInteger("cmd");
    46. if (CMD==1) {
    47. logger.info("cmd1测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
    48. }else if(CMD==2){
    49. logger.info("cmd2测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));
    50. }
    51. action = Action.ACCEPT;
    52. } catch (Exception e) {
    53. action = Action.REJECT;
    54. e.printStackTrace();
    55. } finally {
    56. // 通过finally块来保证Ack/Nack会且只会执行一次
    57. if (action == Action.ACCEPT) {
    58. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    59. } else if (action == Action.RETRY) {
    60. // 判断当前消息消费次数,已经消费3次则放弃消费
    61. if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {
    62. logger.error("MQ_MSGID::" + msgId + ",换电失败消息队列消费了三次,msg=" + msg);
    63. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    64. } else {
    65. // 回归队列重新消费
    66. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    67. }
    68. } else {
    69. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    70. }
    71. }
    72. }
    73. }
    74. }

    运行项目 调用RabbitmqTestController生产RabbitMq消息体, TestExchangeConsumer和TestQueueConsumer自动消费

    1. package com.zq.web.controller.tool;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.zq.cnz.mq.MessageProvider;
    4. import com.zq.cnz.mq.constant.QueueContent;
    5. import com.zq.common.utils.IdUtils;
    6. import org.springframework.web.bind.annotation.GetMapping;
    7. import org.springframework.web.bind.annotation.RequestMapping;
    8. import org.springframework.web.bind.annotation.RestController;
    9. import javax.annotation.Resource;
    10. /**
    11. * 消息队列测试
    12. */
    13. @RestController
    14. @RequestMapping("/test/mq")
    15. public class RabbitmqTestController {
    16. @Resource
    17. private MessageProvider messageProvider;
    18. /**
    19. * 查询储能站信息列表
    20. */
    21. @GetMapping("/putMq")
    22. public void putMq(){
    23. JSONObject obj=new JSONObject();
    24. obj.put("test","测试数据");
    25. //推送消息至消息队列
    26. messageProvider.sendMqMessage(QueueContent.TEST_MQ_QUEUE,obj.toString());
    27. obj.put("cmd",1);
    28. obj.put("test","这是广播消费");
    29. //推送广播消息
    30. messageProvider.sendFanoutMsg(QueueContent.TEST_MQ_QUEUE_EXCHANGE,obj.toString());
    31. //发送延时消息
    32. obj.put("cmd",2);
    33. obj.put("test","这是延迟消费");
    34. messageProvider.sendTimeDelayMsg(QueueContent.TEST_MQ_QUEUE,QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE,obj.toString(),2*60);
    35. }
    36. }

  • 相关阅读:
    adb and 软件架构笔记
    肖sir__设计测试用例方法之_(白盒测试)
    Prometheus Operator 通过additional 添加target
    时间序列预测:用电量预测 04 Std_Linear(多元线性回归算法 & 数据标准化)
    BurpSuite官方实验室之逻辑漏洞
    突发技术故障对工作进程的影响及其应对策略——以电脑硬盘损坏为例
    java学习day17(Java核心类库)IO流
    【ORACLE】Oracle里有“time”数据类型吗?--关于对Oracle数据类型的一点研究
    javase----java基础面试题01-05
    电脑监控软件:保护企业核心信息资产,防止数据泄露
  • 原文地址:https://blog.csdn.net/qq_37557563/article/details/134422240