• Springboot整合rabbitMQ


    目录

    1.Springboot整合rabbitMQ

    2 如何确保消息的可靠性

     2.1 保证消息从生产者到交换机

    2.2 保证消息可以从交换机到队列

    2.3 如何保证消息在队列

    2.4 保证消费者可靠的消费消息

    3 如何限制消费者消费消息的条数(消费端限流)

    4 设置队列过期时间

    4.1 创建队列时为队列设置过期时间(每条信息都是相同时间)

    4.2 单独为消息设置过期时间(为某一个设置过期时间)

     5 死信队列

    6 延迟队列

    7 如何防止消息被重复消费

    8 rabbitMQ的常见面试题


    1.Springboot整合rabbitMQ

    1.创建springboot项目

    1. //rabbitmq依赖
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-amqpartifactId>
    5. dependency>

    2.创建两个子项目producer生产者服务    consumer消费者服务

    3.生产者服务

    ①配置文件application.properties

    1. server.port=7000
    2. #rabbitMQ配置 rabbitMQ所在ip
    3. spring.rabbitmq.host=192.168.1.88
    4. # 都有默认值
    5. spring.rabbitmq.port=5672
    6. spring.rabbitmq.virtual-host=/
    7. spring.rabbitmq.username=guest
    8. spring.rabbitmq.password=guest

    ②使用工具类RabbitTemplate发送消息到队列

    1. package com.wzh.producer.controller;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.GetMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. import java.util.HashMap;
    7. import java.util.Map;
    8. @RestController
    9. public class ProducerController {
    10. @Autowired
    11. private RabbitTemplate rabbitTemplate;
    12. @GetMapping("saveOrder")
    13. public void saveOrder(){
    14. System.out.println("保存订单到数据库");
    15. Map map = new HashMap<>();
    16. map.put("orderId","110");
    17. map.put("price",2500);
    18. map.put("num",3);
    19. map.put("phone","15700085997");
    20. // 交换机 路由key
    21. rabbitTemplate.convertAndSend("topic_exchange","a.orange.b",map);
    22. }
    23. }

    ③创建主启动类

    1. package com.wzh.producer;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class ProducerApp {
    6. public static void main(String[] args) {
    7. SpringApplication.run(ProducerApp.class,args);
    8. }
    9. }

    ④运行并访问7000端口 

    然后rabbitmq客户端就可以查看到,这里都是提前创建好的队列,交换机等。 

    4.消费者服务

    ①配置文件application.properties

    1. server.port=7001
    2. #rabbitMQ配置
    3. spring.rabbitmq.host=192.168.1.88
    4. spring.rabbitmq.port=5672
    5. spring.rabbitmq.virtual-host=/
    6. spring.rabbitmq.username=guest
    7. spring.rabbitmq.password=guest

    ②使用工具类RabbitTemplate发送消息到队列

    1. package com.wzh.rabbitmq;
    2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Component;
    6. import java.util.Map;
    7. @Component
    8. public class MyListener {
    9. //队列名
    10. @RabbitListener(queues = "topic_queue01")
    11. public void test(Map msg){
    12. System.out.println(msg);
    13. //进行相关的业务处理
    14. }

    ③创建主启动类

    1. package com.wzh.consumer;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class ConsumerApp {
    6. public static void main(String[] args) {
    7. SpringApplication.run(ConsumerApp.class,args);
    8. }
    9. }

    ④运行主启动类

    2 如何确保消息的可靠性

    首先确定消息可能在哪些位置丢失---不同的位置可以有不同的解决方案。

     2.1 保证消息从生产者到交换机

    confirm确认机制

    1. 手动开启确认机制(producer服务的配置文件)

            spring.rabbitmq.publisher-confirm-type=correlated
     2. 为rabbitTemplate设置确认回调函数

    1. package com.wzh;
    2. import com.wzh.producer.ProducerApp;
    3. import org.junit.jupiter.api.Test;
    4. import org.springframework.amqp.rabbit.connection.CorrelationData;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.boot.test.context.SpringBootTest;
    8. import javax.annotation.Resource;
    9. /**
    10. * @ProjectName: springboot-rabbitmq
    11. * @Package: com.wzh
    12. * @ClassName: TestRabbitMQ
    13. * @Author: 王振华
    14. * @Description:
    15. * @Date: 2022/9/20 19:56
    16. * @Version: 1.0
    17. */
    18. @SpringBootTest(classes=ProducerApp.class)
    19. public class TestRabbitMQ {
    20. //springboot集成了rabbitMQ提供了一个工具类,该类封装了消息的发送
    21. @Resource
    22. private RabbitTemplate rabbitTemplate;
    23. //测试确认机制
    24. @Test
    25. public void testConfirm(){
    26. //为rabbitTemplate设置确认回调函数
    27. //ConfirmCallback函数式接口 匿名内部类
    28. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    29. //不管是否到达交换机 都会触发该方法
    30. @Override
    31. public void confirm(CorrelationData correlationData, boolean b, String s) {
    32. if(b==false){
    33. //根据自己的业务完成相应的代码
    34. System.out.println("消息发送失败---订单回滚");
    35. }
    36. }
    37. });
    38. //这里用的是路由模式
    39. rabbitTemplate.convertAndSend("direct_exchange","info","hello springboot");
    40. }
    41. }

    查看消费者

    1. package com.wzh.consumer.rabbitmq;
    2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    3. import org.springframework.stereotype.Component;
    4. import java.util.Map;
    5. /**
    6. * @ProjectName: springboot-rabbitmq
    7. * @Package: com.wzh.consumer.rabbitmq
    8. * @ClassName: Consumer
    9. * @Author: 王振华
    10. * @Description:
    11. * @Date: 2022/9/20 15:27
    12. * @Version: 1.0
    13. */
    14. @Component
    15. public class Consumer {
    16. @RabbitListener(queues = "router_queue01")
    17. public void queue01(String s){
    18. System.out.println(s);
    19. }
    20. @RabbitListener(queues = "router_queue02")
    21. public void queue02(String s){
    22. System.out.println(s);
    23. }
    24. }

    消费者的参数必须与生产者的参数类型一致

    2.2 保证消息可以从交换机到队列

    returning机制: 如果消息无法到达队列,则会触发returning机制。如果能从交换机到队列则不会触发returning机制。

    默认rabbitMQ不开启该机制。

    ①开启returning机制(生产者配置文件)

             spring.rabbitmq.publisher-returns=true

    ②为rabbitTemplate设置returning回调函数

    1. /**
    2. * 1.开启returning机制 spring.rabbitmq.publisher-returns=true
    3. * 2.为rabbitTemplate设置returning回调函数
    4. */
    5. @Test
    6. public void testReturning(){
    7. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    8. //该方法只有从交换机到队列失败时才会触发
    9. @Override
    10. public void returnedMessage(ReturnedMessage returned) {
    11. //根据自己的业务完成相应的代码
    12. System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~");
    13. }
    14. });
    15. //这里用的是路由模式
    16. rabbitTemplate.convertAndSend("direct_exchange","error","hello springboot2");
    17. }

     ③消费者监听结果

    2.3 如何保证消息在队列

    1. 队列持久化---》创建的时候设置持久化

    2. 搭建rabbitmq集群--保证高可用

    2.4 保证消费者可靠的消费消息

    ①修改为手动确认模式

    改为手动确认(消费者服务配置文件)

            spring.rabbitmq.listener.simple.acknowledge-mode=manual

    ②当业务处理完毕后在确认消息给队列让其删除该消息

    1. package com.wzh.consumer.rabbitmq;
    2. import com.rabbitmq.client.Channel;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. import java.io.IOException;
    7. import java.util.Map;
    8. /**
    9. * @ProjectName: springboot-rabbitmq
    10. * @Package: com.wzh.consumer.rabbitmq
    11. * @ClassName: Consumer
    12. * @Author: 王振华
    13. * @Description:
    14. * @Date: 2022/9/20 15:27
    15. * @Version: 1.0
    16. */
    17. @Component
    18. public class Consumer {
    19. @RabbitListener(queues = "router_queue01")
    20. public void queue01(String s){
    21. System.out.println(s);
    22. }
    23. @RabbitListener(queues = "router_queue02")
    24. //Message message 封装了信息类, Channel channel 信道
    25. public void queue02(Message message, Channel channel){
    26. byte[] body = message.getBody();
    27. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    28. System.out.println("接受到消息"+new String(body));
    29. try {
    30. System.out.println("处理业务代码");
    31. //这里会因为硬件问题 或其他业务代码问题 导致处理不完 出现异常了 走catch
    32. System.out.println("业务处理完毕");
    33. //设置手动确认---队列会把该信息移除。
    34. //long deliveryTag,消息的标记
    35. // boolean multiple: 是否把之前没有确认的消息以前确认掉。
    36. channel.basicAck(deliveryTag,true);
    37. }catch (Exception e){
    38. //1.让队列在给我发一次。
    39. //long deliveryTag, boolean multiple,
    40. // boolean requeue: 继续发给我 直接扔掉
    41. try {
    42. channel.basicNack(deliveryTag,true,true);
    43. } catch (IOException ioException) {
    44. ioException.printStackTrace();
    45. }
    46. }
    47. }
    48. }

    如何保证消息的可靠性。

    1. 设置confirm和returning机制

    2. 设置队列和交互机的持久化

    3. 搭建rabbitMQ服务集群

    4. 消费者改为手动确认机制。

    3 如何限制消费者消费消息的条数(消费端限流)

    如果队列里有10w条数据,消费者不管能不能消化,会直接把10w条消息都接受,会导致消费者服务垮掉

    1. 设置消费者消费消息的条数

    2. 消费者端必须为手动确认模式。

    ① 修改每次拉取消息的条数

     ②测试

     

    1. package com.wzh.consumer.rabbitmq;
    2. import com.rabbitmq.client.Channel;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. import java.io.IOException;
    7. import java.util.Map;
    8. /**
    9. * @ProjectName: springboot-rabbitmq
    10. * @Package: com.wzh.consumer.rabbitmq
    11. * @ClassName: Consumer
    12. * @Author: 王振华
    13. * @Description:
    14. * @Date: 2022/9/20 15:27
    15. * @Version: 1.0
    16. */
    17. @Component
    18. public class Consumer {
    19. @RabbitListener(queues = "test03")
    20. public void queue01(Message message, Channel channel) throws IOException {
    21. byte[] body = message.getBody();
    22. System.out.println("消息的内容:"+new String(body));
    23. //手动确认
    24. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    25. }
    26. @RabbitListener(queues = "router_queue02")
    27. //Message message 封装了信息类, Channel channel 信道
    28. public void queue02(Message message, Channel channel){
    29. byte[] body = message.getBody();
    30. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    31. System.out.println("接受到消息"+new String(body));
    32. try {
    33. System.out.println("处理业务代码");
    34. //这里会因为硬件问题 或其他业务代码问题 导致处理不完 出现异常了 走catch
    35. System.out.println("业务处理完毕");
    36. //设置手动确认---队列会把该信息移除。
    37. //long deliveryTag,消息的标记
    38. // boolean multiple: 是否把之前没有确认的消息以前确认掉。
    39. channel.basicAck(deliveryTag,true);
    40. }catch (Exception e){
    41. //1.让队列在给我发一次。
    42. //long deliveryTag, boolean multiple,
    43. // boolean requeue: 继续发给我 直接扔掉
    44. try {
    45. channel.basicNack(deliveryTag,true,true);
    46. } catch (IOException ioException) {
    47. ioException.printStackTrace();
    48. }
    49. }
    50. }
    51. }

    4 设置队列过期时间

    TTL:time to live

    可以为整个队列设置也可以单独为某条信息设置

    4.1 创建队列时为队列设置过期时间(每条信息都是相同时间)

    创建队列 

     

    创建交换机

     点击test01进行 交换机与队列绑定

     测试

    1. //测试过期时间
    2. @Test
    3. public void test01(){
    4. for(int i=0;i<10;i++) {
    5. if(i<5){
    6. //前五个 同时创建 后五个 每个间隔两秒 创建 过期时间也不一样
    7. rabbitTemplate.convertAndSend("test01", "aaa", "hello springboot"+i);
    8. }else {
    9. try{
    10. Thread.sleep(3000);
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. rabbitTemplate.convertAndSend("test01", "aaa", "hello springboot" + i);
    15. }
    16. }
    17. }

    4.2 单独为消息设置过期时间(为某一个设置过期时间)

    当前消息队列为普通队列,没有在创建队列的时候设置过期时间

    1. @Test
    2. public void test02() {
    3. MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
    4. @Override
    5. public Message postProcessMessage(Message message) throws AmqpException {
    6. message.getMessageProperties().setExpiration("10000");
    7. return message;
    8. }
    9. };
    10. rabbitTemplate.convertAndSend("test01", "aaa", "Hello Springboot",messagePostProcessor);
    11. }

     5 死信队列

     

    创建普通队列

     创建死信队列

     

     创建普通交换机

    创建死信交换机 

     队列绑定交换机

     

     死信交换机与死信队列绑定

     测试

    1. @Test
    2. public void test02() {
    3. for (int i = 0; i < 10; i++)
    4. rabbitTemplate.convertAndSend("pt_exchange", "dead", "Hello Springboot-------"+i);
    5. }
    6. }

    6 延迟队列

     这里的判断订单状态 是因为 如果支付系统第29分分钟去支付,支付的比较慢,最后在第31分钟支付成功了。消息30分钟加入死信队列执行库存回滚,就会出错。

     7 如何防止消息被重复消费

     

    8 rabbitMQ的常见面试题

    1. 如何防止消息被重复消费

    2.如何保证消息的可靠性

    3.rabbitMQ消息积压过多

     

  • 相关阅读:
    R语言生成字符串的所有成对组合:使用outer函数和paste函数生成所有字符串的成对组合、自定义获取组合结果矩阵的下三角矩阵结果
    启动Docker Desktop报 “Docker Desktop - Unexpected WSL error”
    AntX6 DAG拖拽流程图:从0到1实现流程图05-连接桩篇
    【openGauss】在windows中使用容器化的mogeaver
    【C++并发编程】(一)线程管理
    U盘插上就显示让格式化是坏了吗?
    支配树🌴学习笔记
    苹果上架Guideline 4.3 - Design
    10.cuBLAS开发指南中文版--cuBLAS中的logger配置
    Spring IoC和DI详解
  • 原文地址:https://blog.csdn.net/weixin_68509156/article/details/126959894