• 2022谷粒商城学习笔记(二十二)rabbitMQ学习


    前言

    本系列博客基于B站谷粒商城,只作为本人学习总结使用。这里我会比较注重业务逻辑的编写和相关配置的流程。有问题可以评论或者联系我互相交流。原视频地址谷粒商城雷丰阳版。本人git仓库地址Draknessssw的谷粒商城


    消息队列使用场景

    在这里插入图片描述
    在这里插入图片描述
    消息队列主要有两种形式的目的地

    1. 队列(queue):点对点消息通信(point-to-point)

    点对点式: • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获 取消息内容,消息读取后被移出队列 • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者

    2. 主题(topic):发布(publish)/订阅(subscribe)消息通信
    发布订阅式: • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息

    在这里插入图片描述
    Docker安装rabbitMQ

    docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management 4369
    
    • 1

    25672 (Erlang发现&集群端口)
    5672, 5671 (AMQP端口)
    15672 (web管理后台端口)
    61613, 61614 (STOMP协议端口)
    1883, 8883 (MQTT协议端口)


    交换机的四种分发策略

    在这里插入图片描述

    1、朴实无华的交换机绑定队列方式direct,交换机绑定一个至多个队列,交换机发送消息的binding-key需要和对应的routing-key信息一致。

    在这里插入图片描述

    2、广播模式,路由绑定队列,发送消息,不需要对应某个routing-key,向任何一个binding-key发送消息都可以发送到全部队列。

    在这里插入图片描述

    3、主题路由模式,需要指定binding-key的通配符,符合通配符规则的才向对应的队列发送消息。

    在这里插入图片描述


    消息确认机制

    在这里插入图片描述
    发送端确认

    定制rabbitTemplate

    在这里插入图片描述
    首先是消息转json

    @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
    • 1
    • 2
    • 3
    • 4

    消息正确抵达回调方法和失败回调方法

    	/**
         * 定制RabbitTemplate
         * 1、服务收到消息就会回调
         *      1、spring.rabbitmq.publisher-confirms: true
         *      2、设置确认回调
         * 2、消息正确抵达队列就会进行回调
         *      1、spring.rabbitmq.publisher-returns: true
         *         spring.rabbitmq.template.mandatory: true
         *      2、设置确认回调ReturnCallback
         *
         * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
         *
         */
        // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
        public void initRabbitTemplate() {
    
            /**
             * 1、只要消息抵达Broker就ack=true
             * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
             * ack:消息是否成功收到
             * cause:失败的原因
             */
            //设置确认回调
            rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
                System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
            });
    
    
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * message:投递失败的消息详细信息
             * replyCode:回复的状态码
             * replyText:回复的文本内容
             * exchange:当时这个消息发给哪个交换机
             * routingKey:当时这个消息用哪个路邮键
             */
            rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
                System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                        "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    rabbitMQ配置

    spring:
    	rabbitmq:
        	host: 192.168.75.129
        	port: 5672
        	# 虚拟主机
        	virtual-host: /
        	# 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】
        	publisher-confirm-type: correlated
        	# 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】
        	publisher-returns: true
        	# 消息在没有被队列接收时是否强行退回
        	template:
         		 mandatory: true
        	# 消费者手动确认模式,关闭自动确认,否则会消息丢失
        	listener:
          		simple:
            		acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    消费端可以自动确认消息的抵达
    但是,可以设置消费端手动确认消息抵达

    example

    package com.xxxx.gulimall.order.listener;
    
    import com.rabbitmq.client.Channel;
    import com.xxxx.gulimall.order.entity.OrderEntity;
    import com.xxxx.gulimall.order.service.OrderService;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    
    /**
     * @Description: 定时关闭订单
     *
     **/
    
    @RabbitListener(queues = "order.release.order.queue")
    @Service
    public class OrderCloseListener {
    
        @Autowired
        private OrderService orderService;
    
        @RabbitHandler
        public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
            System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
            try {
                orderService.closeOrder(orderEntity);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (Exception e) {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            }
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    消息丢失处理

    在这里插入图片描述

    try {
                    //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息
                    rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
                } catch (Exception e) {
                    //TODO 定期扫描数据库,重新发送失败的消息
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    消息重复

    在这里插入图片描述


    消息积压

    在这里插入图片描述

  • 相关阅读:
    【星海出品】C++的基础(一)理论知识
    从pcap文件中提取pcma音频
    SpringBoot 实现数据脱敏
    【scikit-learn基础】--『回归模型评估』之准确率分析
    vue3项目,vite+vue3+ts+pinia(7)-axios网络请求
    【算法】贪心法
    猿创征文|工具百宝箱-代码编辑器-版本控制工具-终端神器-项目与事务跟踪工具-SFTP客户端
    深入浅出C#消息
    让 Web3 认证拥有和 Web2 一样丝滑体验的技术路径
    AlphaFold2源码解析(4)--模型架构
  • 原文地址:https://blog.csdn.net/qq_44737138/article/details/126788630