• rabbitmq+springboot实现幂等性操作


    文章目录

    1.场景描述

    消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

    1.1 场景1

    什么意思呢?举个例子:一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
    这种情景就会出现消息可能被多次地投递。

    1.2 场景2

    还有一种场景是程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

    以上两个场景对于消息队列来说就是同一个messageId的消息重复投递下来了。

    我们利用消息id来判断消息是否已经消费过,如果该信息被消费过,那么消息表中已经 会有一条数据,由于消费时会先执行插入操作,此时会因为主键冲突无法重复插入,我们就利用这个原理来进行幂等的控制,消息内容可以用json格式来进行传输的。

    3.实战开发

    3.1 建表

    DROP TABLE IF EXISTS `message_idempotent`;
    CREATE TABLE `message_idempotent` (
      `message_id` varchar(50) NOT NULL COMMENT '消息ID',
      `message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
      `status` int DEFAULT '0' COMMENT '消费状态(0-未消费成功;1-消费成功)',
      `retry_times` int DEFAULT '0' COMMENT '重试次数',
      `type` int DEFAULT '0' COMMENT '消费类型',
      PRIMARY KEY (`message_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.2 集成mybatis-plus

    《springBoot集成mybatisPlus》

    3.3 集成RabbitMq

    3.3.1 安装mq

    推荐使用docker安装rabbitmq,还未安装的可以参考以下信息:

    3.3.2 springBoot集成mq

    • 1.添加依赖
     
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.4 生产者具体实现

    3.4.1 mq配置类

    import org.springframework.amqp.core.\*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitmqConfig {
    
        //正常交换机的名字
        public final static String  EXCHANGE\_NAME = "exchange\_name";
        //正常队列的名字
        public final static String QUEUE\_NAME="queue\_name";
        //死信交换机的名字
        public final static String  EXCHANGE\_DEAD = "exchange\_dead";
        //死信队列的名字
        public final static String QUEUE\_DEAD="queue\_dead";
        //死信路由key
        public final static String DEAD\_KEY="dead.key";
    
    
    
    
        //创建正常交换机
        @Bean(EXCHANGE\_NAME)
        public Exchange exchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
                    //持久化 mq重启后数据还在
                    .durable(true)
                    .build();
        }
    
    
    
        //创建正常队列
        @Bean(QUEUE\_NAME)
        public Queue queue(){
            //正常队列和死信进行绑定 转发到 死信队列,配置参数
            Map<String,Object>map=getMap();
            return new Queue(QUEUE\_NAME,true,false,false,map);
        }
    
        //正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射
        @Bean
        public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
                               @Qualifier(EXCHANGE\_NAME) Exchange exchange){
            return BindingBuilder.bind(queue)
                    .to(exchange)
                    //路由规则
                    .with("app.#")
                    .noargs();
        }
    
        //创建死信队列
        @Bean(QUEUE\_DEAD)
        public Queue queueDead(){
            return new Queue(QUEUE\_DEAD,true,false,false);
        }
    
        //创建死信交换机
        @Bean(EXCHANGE\_DEAD)
        public Exchange exchangeDead(){
            return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
                    .durable(true) //持久化 mq重启后数据还在
                    .build();
        }
    
    
        //绑定死信队列和死信交换机
        @Bean
        public Binding deadBinding(){
            return BindingBuilder.bind(queueDead())
                    .to(exchangeDead())
                    //路由规则 正常路由key
                    .with(DEAD\_KEY)
                    .noargs();
        }
    
        /\*\*
          获取死信的配置信息
         \*
         \*\*/
        public Map<String,Object>getMap(){
            //3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。
            //方式一
            Map<String,Object> map=new HashMap<>(16);
            //死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
            map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
            //死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
            map.put("x-dead-letter-routing-key", DEAD\_KEY);
            //方式二
            //消息的过期时间,单位:毫秒;达到时间 放入死信队列
            // map.put("x-message-ttl",5000);
            //方式三
            //队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据
            // map.put("x-max-length",3);
            return map;
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    由于rabbitMq中不直接支持死信队列,需要我们利用插件rabbitmq_delayed_messgae_exchage进行开启

    /**
     * 定义延迟交换机
     */
    @Configuration
    public class RabbitMQDelayedConfig {
        //队列
        private static final String DELAYQUEUE = "delayedqueue";
        //交换机
        private static final String DELAYEXCHANGE = "delayedExchange";
        @Bean
        public Queue delayqueue(){return new Queue(DELAYQUEUE);}
        //自定义延迟交换机
        @Bean
        public CustomExchange delayedExchange(){
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type","direct");
            /**
             * 1、交换机名称
             * 2、交换机类型
             * 3、是否需要持久化
             * 4、是否需要自动删除
             * 5、其他参数
             */
            return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);
        }
        //绑定队列和延迟交换机
        @Bean
        public Binding delaybinding(){
            return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.4.2 生产者

    • 1.消费队列的生产者
    import com.example.shop.config.RabbitmqConfig;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    @Component
    public class Sender_Direct {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        /**
         * 用于消费订单
         *
         * @param orderId
         */
        public void send2Direct(String orderId) {
            //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
            MessageProperties messageProperties = new MessageProperties();
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "内容设置",  message -> {
                //设置消息的id为唯一
                messageProperties.setMessageId(UUID.randomUUID().toString());
                messageProperties.setContentType("text/plain");
                messageProperties.setContentEncoding("utf-8");
                message.getMessageProperties().setMessageId(orderId);
                return message;
            });
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    3.4.3 消费者

    1.开启手动ack配置

    spring:
      application:
        name: shop
      rabbitmq:
        host: 192.168.1.102
        port: 5673
        virtual-host: /
        username: guest
        password: guest
        listener:
          simple:
            # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者要配置ack重试机制,具体参考前几篇文章,使用的是mysql消息ID的唯一性,有时候可能生成一样的订单,具体的没有进行实验,内容是json生成的,可以执行业务

    import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
    import com.example.des.Bean.MessageIdempotent;
    import com.example.des.Bean.Shop;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    
    @Component
    public class Receiver_Direct {
        private static final Integer delayTimes = 30;//延时消费时间,单位:秒
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RabbitListener(queues = {"smsQueue"})
        public void receiveD(Message message, Channel channel) throws IOException {
            try {
                // 获取消息Id
                String messageId = message.getMessageProperties().getMessageId();
                String msg = new String(message.getBody());//获取消息
                //向数据库插入数据
                MessageIdempotent messageIdempotent = new MessageIdempotent();
                messageIdempotent.setMessageId(messageId);
                messageIdempotent.setMessageContent(msg);
                messageIdempotent.setRetryTimes(0);
                System.out.println(messageIdempotent.toString());
                Boolean save = true;   //设置保存成功,消息投递失败是在确认模式那里
    
                if (!save) {//说明属于重重复请求
                    //1、处理消息内容的业务,解析json数据
                    //2、创建订单,并保存
                    Boolean flag = consumeOrder(new Shop());
                    if (flag){
                        //投入延迟队列,如果30分钟订单还没有消费,就删除订单
                        rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{
                            //设置发送消息的延长时间 单位:ms,表示30分钟
                            message1.getMessageProperties().setDelay(1000*60*30);
                            return message1;
                        });
                        //更新消息状态,消费成功,
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    }else {
                        //延迟投入死信,进行重试
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                    }
                } else {
                    //1、处理消息内容的业务,解析json数据
                    //2、创建订单,并保存
                    //投入死信队列
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                }
            }catch (Exception e){
                System.out.println("错误信息");
            }
    
        }
    
        private boolean consumeOrder(Shop shop) {
            return true;
        }
    
        @RabbitListener(queues = {" delay.queue.demo.delay.queue"})
        public void dead(String payload, Message message, Channel channel) throws IOException {
            System.out.println("死信队列:"+payload);
            //删除消息 将数据库状态更新为失败,更新邮件或者消息通知,有时候可以人工消费
            long deliveryTag=message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag,true);
        }
    
        @RabbitListener(queues = "delayedqueue")
        public void receivemsg(Message messages){
            //查询有没有被消费,也就是更新成功,有时候需要乐观锁
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    至此mq的消息重复以及幂等的信息处理就很完美的解决了,当然本文以数据库为例进行实现,感兴趣的可以尝试使用redis来进行实现

  • 相关阅读:
    【MapGIS精品教程】001:MapGIS K9完整图文安装教程
    聊聊logback的LevelFilter
    在矩池云上使用Syntaxnet解析文本
    netstat 竟然还能这么玩儿?
    记一次MySql重置root密码无效
    26岁月薪从7k到17K,这一切都要从那年失业讲起...
    IDEA 中使用 SparkSQL 远程连接 Hive
    线性代数学习笔记7-3:解微分方程、矩阵的指数函数(特征值的应用)
    基于FPGA的图像拉普拉斯变换实现,包括tb测试文件和MATLAB辅助验证
    怎么进行视频恢复?推荐使用这4种方法
  • 原文地址:https://blog.csdn.net/weixin_43360488/article/details/132652000