• RabbitMQ的应用场景


    一、RabbitMQ的安装和初步配置

    1、安装

    1)我这里为了方便,是用docker部署的RabbitMQ;要用docker部署,首先要保证linux已经安装好了docker,并且docker运行正常,可以参考我之前的文章-----Docker的安装和卸载教程

    2)确保Linux系统的docker环境正常后,只需执行以下命令,docker容器便会自动从Docker Hub中拉取RabbitMQ的镜像,并创建容器(注意:docker会自动帮我们部署Erlang环境)。

    参数解释:该命令包括安装Web页面管理的 rabbitmq:management组件,账号和密码都为 admin ;-p 后面参数表示公网IP地址的端口号对应容器内部的端口号。

    docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
    
    • 1

    2、访问RabbitMQ的Web页面

    rabbitmq有一个默认账号和密码是: guest ,默认情况只能在 localhost本计下访问;所以我们需要通过刚才创建的admin用户进行登录。输入 http://IP地址:15672 即可完成访问,账号密码都为admin。
    在这里插入图片描述

    3、Docker安装rabbitmq后的一些操作

    1)执行rabbitmq命令

    用docker安装rabbtimq后,对rabbitmq的操作命令,需要进入rabbitmq镜像中的rabbimq容器里边执行才有效;
    a、先查看docker正在运行的容器,找到rabbitmq
    在这里插入图片描述
    b、执行如下命令,进入rabbitmq容器

    docker exec -i -t 59b bin/bash
    
    • 1

    其中“59b”是容器id的前三个字母,写三个就够了
    c、之后就可以正常的执行rabbitmq命令了

    2)新增一个rabbimq用户,并分配角色和权限

    a、1)中操作完成后进入rabbitmq容器,执行添加用户命令

    rabbitmqctl add_user derek derek
    
    • 1

    b、给用户分配角色,这里设置为超级管理员角色

    rabbitmqctl set_user_tags derek administrator
    
    • 1

    c、给用户分配权限,这里给的最大权限

    rabbitmqctl set_permissions -p / derek ".*" ".*" ".*"
    
    • 1

    d、可以使用命令rabbitmqctl list_users查看用户列表,也可以进入rabbitmq客户端的admin页签查看用户列表,如下图
    在这里插入图片描述
    在这里插入图片描述

    3)重启,或者虚拟机宕机后重启rabbimq

    a、先启动docker

    systemctl start docker
    
    • 1

    b、查看docker镜像,重启rabbtmq镜像

    二、RabbitMQ的应用场景

    RabbitMQ 是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据 RabbitMQ 配置的转发机制接收服务端发来的消息。RabbitMQ 依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    1、服务解耦

    假设有这样一个场景,服务 A 产生数据,而服务 B,C,D 需要这些数据,那么我们可以在 A 服务中直接调用 B,C,D 服务,把数据传递到下游服务即可。

    但是,随着我们的应用规模不断扩大,会有更多的服务需要 A 的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么 A 服务中调用代码的维护会极为困难。

    这是由于服务之间耦合度过于紧密。

    再来考虑用 RabbitMQ 解耦的情况。

    A 服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可。

    2、流量削峰

    假设我们有一个应用,平时访问量是每秒 300 请求,我们用一台服务器即可轻松应对。
    而在高峰期,访问量瞬间翻了十倍,达到每秒 3000 次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到 10 台服务器,来分散访问压力。

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们 10 台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了。

    这种情况,我们就可以使用 RabbitMQ 来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力。

    这是消息队列服务器非常典型的应用场景。

    3、异步调用

    考虑定外卖支付成功的情况。

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长。

    这样就造成整条调用链路响应非常缓慢。

    而如果我们引入 RabbitMQ 消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有 200 毫秒左右。

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作。

    三、应用实例(1个消息->1个消费者->多个实现分别执行不同任务)

    1、应用场景

    以订单入库为例,订单入库后向rm发一个消息。
    订单入库之后会有多项后续操作需要进行,比如优惠券模块、减库存模块、发邮件模块等等;为了降低各模块的耦合度,同时减少下订单接口的压力,我们希望通过一次rm消息投递,就完成下单之后的减库存、发邮件、发优惠券等多项后续操作。

    2、实例代码及说明

    1)写一个订单入库的接口,需要在订单入库之后进行的操作都通过实现这个接口达成

    /**
     * 交易入库事件
     */
    public interface TradeIntoDbEvent {
    
        void onTradeIntoDb(TradeVO tradeVO);
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    2)rm的消费者,通过@Auwired注入这个接口的所有实现,在消费方法中循环调用全部实现

    /**
     * 订单入库消息处理
     */
    @Component
    public class OrderIntoDbReceiver {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
    	//注意这里,一下注入接口的所有实现,需要定义list变量
        @Autowired(required = false)
        private List<TradeIntoDbEvent> events;
    
        @Autowired
        private Cache cache;
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = AmqpExchange.ORDER_CREATE + "_QUEUE"),
                exchange = @Exchange(value = AmqpExchange.ORDER_CREATE, type = ExchangeTypes.FANOUT)
        ))
        public void tradeIntoDb(String tradeVOKey) {
    
            TradeVO tradeVO = (TradeVO) this.cache.get(tradeVOKey);
            if (events != null) {
            	//循环调用所有接口实现,使订单入库后所有的后续操作得以执行
                for (TradeIntoDbEvent event : events) {
                    try {
                        event.onTradeIntoDb(tradeVO);
                    } catch (Exception e) {
                        logger.error("交易入库消息出错", e);
                    } finally {
                        cache.remove(tradeVOKey);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3)之后再有功能上的增、删,比如订单入库多一个后续操作,或者少一个后续操作,就对接口的实现进行增删即可

    3、这种方式需要注意的地方

    因为一个消息有多个实现方法,所以消费者方法中不能进行消息确认(因为不好确认谁最后一个执行,消息确认可能造成部分实现无法顺利完成);

    这种情况下,对于非幂等的操作,需要在接口实现中防止重复消费(只是防止重复消费,不要进行消息确认)。

    三、(防止)重复消费

    1、重复消费的原因

    1)多个消费者,其中一个消费信息后没有通知队列(没有给mq发ack,或者给mq发ack的过程中挂了、失败了),队列又把消息发给了其他消费者就会重复消费。

    2)对于非幂等性操作,要防止重复消费;
    对于幂等性操作一般不用防止重复消费。
    多次操作,得到的结果相同,就是幂等性操作;比如,查询一般属于幂等性操作,新增、更新属于非幂等性操作。

    2、解决方法

    用redis。
    在消费者收到消息时,先执行setnx,setnx是redis命令,是’SET if Not eXists’(如果不存在,则 SET)的简写,RedisTemplate中也有对应的方法。setnx执行后,如果返回true说明key不存在、设置成功,如果返回false说明key已存在、设置失败。

    eg:我们约定
    id=0,表示正在执行
    id=1,表示执行成功

    消费者收到消息,在执行业务前,先用setnx方法试着将id放入redis中,

    Boolen result = redisTemplate.opsForValue().setIfAbsent(orderId+"_reineforce", "0");
    
    • 1

    1)如果key不存在,即setnx返回true,消费者就正常消费;
    2)如果key存在,即setnx返回false,那么就要取出key的值作判断,
    为0就不消费;
    为1就说明该消息其他消费者已经消费过了,只是发ack没成功,这里直接ack就行。

    极端情况
    第一个消费者正在执行业务时,挂了、出现死锁,这种怎么办?
    在setnx的基础上,再给key设置一个生存时间,防止出现死锁。

    四、延时队列

    1、延时队列的应用场景

    2、RabbitMQ实现掩饰队列的底层逻辑

    1)RabbitMQ 中的 TTL

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
    目前有两种方法可以设置消息的 TTL:
    第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间;
    第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。
    如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。

    2)死信(Dead Letter)队列

    “死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

    a、消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
    b、消息在队列的存活时间超过设置的TTL时间。
    c、消息队列的消息数量已经超过最大队列长度。

    那么该消息将成为“死信”。

    “死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

    3)应用RabbiMQ死信,实现延时队列功能

    a、给消息m设置TTL,通过一个普通交换机X1路由到一个普通队列Q1;
    b、这个普通队列Q1并没有配置消费者,但Q1事先配置了一个死信交换机X2,当TTL时间到了之后、消息m变成死信,就会被丢到死信交互机X2中;
    c、X2会将消息m路由到对应的队列、也叫死信队列Q2中,Q2事先配置了消费者C,这样消息m就会被投递到消费者那里。

    总结一下:消息m,发送到rabbitmq后,经过了设置的ttl时间,被投递到消费者C那里。
    
    • 1

    这时,也许有小伙伴会说,这有什么,不就是一个简单的定时吗?
    非也、非也,我们说一个需求场景,帮助大家理解:假设商城要做拼团活动功能,你的任务是在活动开始时间,给发起人发一个消息,很简单明了的一个功能。
    发短信功能简单,这个需求的关键,是在活动开始时要触发发短信接口,也就是活动开始时你要能感知到;注意啊、是活动开始时间,不是活动创建时间,我完全可以创建一个2天后开始的拼团活动,所以肯定不能在创建活动时调用发短信接口。
    这时你会说,这还不简单,创建活动时根据开始时间与当前时间的差值,设置一个定时器不就可以了?
    你仔细想一下,活动有很多个,活动的开始时间千变万化,难道你要为每一个活动创建一个定时器吗?这显然不可能。
    当然,定时器配合数据库表,可以实现动态配置;这样做是可以,但一来这样做费老鼻子劲儿了,二来这么多同时运行的定时器,对系统性能的耗费是巨大的,为了这么一个小功能显然得不偿失。
    这时你再看rabbimq的延时队列,是不是怎么看怎么顺眼?可靠,又因为是中间件耦合性小,写成延时功能后可以像工具方法一样到处使用,高可用、但对系统性能牺牲却很小,完美。
    在多说一点,RabbitMQ除了支持给消息设置TTL,还支持给队列设置TTL;但是给队列设置TTL和普通的定时器一样,不能重复使用,所以也就不多说了。

    4)延时消息插件rabbitmq_delayed_message_exchange

    【rabbitmq_delayed_message_exchange插件的介绍和使用方法】
    3中讲述的延时队列模型有一个问题,比如我们给队列Q1中发送了两个消息,第一个是5天后执行(即TTL等于5天的毫秒值)的消息m1,第二个是2天后执行的消息m2;结果你执行后发现,消息m1和m2都是5天后收到的、这就是说消息m2并没有被按时发送。
    这是由于,rabbitmqRabbitMQ 只会检查第一个消息是否过期,如果前边的消息TTL比后边的大,那么后面的消息即使已经超时也不能变成死信、也就不能按时投递到消费者了。
    为了更方便的实现延时队列功能,RabbitMQ提供了rabbitmq_delayed_message_exchange插件;需要开发者自己进行安装。
    这个插件,相当于提供了一个新类型的交换机,所以插件安装成功后,在rabbitmq控制台页面,查看exchange类型,如果出现x-delayed-message类型的选项
    在这里插入图片描述

    a)实现机制

    x-delayed-message类型的交换机支持延迟投递机制,消息传到交换机后、并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中

    b)使用方法

    因为x-delayed-message类型的交换机支持延迟投递机制,所以rabbitmq延时队列模型得到简化
    我们只需要新建一个死信队列与x-delayed-message类型的交换机进行绑定,即可实现上面3中的延时队列功能。
    需要注意:用户必须使用名为x-delay的特殊header,向交换机发布延迟消息,该header需要一个整数,表示RabbitMQ应延迟消息的毫秒数。

    rabbitTemplate.convertAndSend(TimeTriggerConfig.DELAYED_EXCHANGE_XDELAY, TimeTriggerConfig.DELAY_ROUTING_KEY_XDELAY, timeTriggerMsg, message -> {
                Long current = DateUtil.getDateline();
                //设置延时时间,目标时间多5秒
                Long time = (triggerTime - current) * 1000 + 5000 ;
                    message.getMessageProperties().setHeader("x-delay", time);
                return message;
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    c)优缺点

    插件优点:
    不需要为延迟消息单独创建路由、交换器、队列;

    插件缺点:
    1、不支持对已发送消息进行管理,只能在Web管理页面查看发送的数量DM;
    2、集群中只有一个副本(保存在当前节点下的Mnesia表中),如果节点不可用或关闭插件会丢失消息;
    3、目前该插件只支持disk节点,不支持ram节点;
    4、性能比原生的差一点(普通的Exchange收到消息后直接路由到队列,而延迟队列需要判断消息是否过期,未过期的需要保存在表中,时间到了再捞出来路由);

    2)rabbitmq_delayed_message_exchange插件的安装

    3、应用实例——生产者

    1)延时任务接口(消息生产者使用的接口)

    这里生产者模块化为一个接口,我觉得直接写一个类也可以,因为这个接口只有一个实现。

    注意下面延时队列触发器的add方法,第一个参数executerName。因为之后消息消费者,调用执行接口是高可用的设计,即执行接口有多个实现,所以这里将本次使用延时队列需要调用的实现类的BeanId、也就是首字母小写的类名(默认是这样,有配置另说)放入消息体传过去;这样执行器执行前,先通过ApplicationContext类的getBean(beanId);方法得到一个指定执行器实现的对象。这个地方,正是本延时队列、代码高可用设计的关键所在。

    /**
     * 延时执行接口
     */
    public interface TimeTrigger {
    
        /**
         * 添加延时任务
         *
         * @param executerName 执行器beanid
         * @param param        执行参数
         * @param triggerTime  执行时间 时间戳 秒为单位
         * @param uniqueKey    如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/>
         *                     请填写此参数,作为后续删除,修改做为唯一凭证 <br/>
         *                     建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/>
         *                     业务内全局唯一
         */
        void add(String executerName, Object param, Long triggerTime, String uniqueKey);
    
        /**
         * 修改延时任务
         *
         * @param executerName   执行器beanid
         * @param param          执行参数
         * @param triggerTime    执行时间 时间戳 秒为单位
         * @param oldTriggerTime 旧的任务执行时间
         * @param uniqueKey      添加任务时的唯一凭证
         */
        void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey);
    
        /**
         * 删除延时任务
         *
         * @param executerName 执行器
         * @param triggerTime  执行时间
         * @param uniqueKey    添加任务时的唯一凭证
         */
        void delete(String executerName, Long triggerTime, String uniqueKey);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2)延时任务接口实现(消息生产者使用的接口实现)

    /**
     * 延时任务生产 rabbitmq实现
     *
     * @Description: 原理:利用amqp的死信队列的超时属性,将超时的任务转到普通队列交给消费者执行。
     * 添加任务,将任务执行标识、beanid、执行时间,hash值存入redis,标识任务需要执行
     * 任务编辑,将之前的标识删除,重新添加任务
     * 添加删除,删除redis中的任务标识,消费者执行时获取不到 redis中的标识,则不会执行延时任务
     */
    @Component
    public class RabbitmqTimeTrigger implements TimeTrigger {
    
        /**
         * 引入rabbit的操作模板
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private Cache cache;
    
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
    
        /**
         * 添加延时任务
         *
         * @param executerName 执行器
         * @param param        执行参数
         * @param triggerTime  执行时间
         * @param uniqueKey    如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/>
         *                     请填写此参数,作为后续删除,修改做为唯一凭证 <br/>
         *                     建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/>
         *                     业务内全局唯一
         */
        @Override
        public void add(String executerName, Object param, Long triggerTime, String uniqueKey) {
    
            if (StringUtil.isEmpty(uniqueKey)) {
                uniqueKey = StringUtil.getRandStr(10);
            }
            //标识任务需要执行
            cache.put(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey), 1);
    
            TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executerName, param, triggerTime, uniqueKey);
            logger.debug("定时执行在【" + DateUtil.toString(triggerTime, "yyyy-MM-dd HH:mm:ss") + "】,消费【" + param.toString() + "】");
            rabbitTemplate.convertAndSend(TimeTriggerConfig.DELAYED_EXCHANGE_XDELAY, TimeTriggerConfig.DELAY_ROUTING_KEY_XDELAY, timeTriggerMsg, message -> {
    
                Long current = DateUtil.getDateline();
                //如果执行的延时任务应该是在现在日期之前执行的,那么补救一下,要求系统一秒钟后执行
                if (triggerTime < current) {
                    message.getMessageProperties().setDelay(1000);
                } else {
                    Long time = (triggerTime - current) * 1000 + 5000 ;
                    message.getMessageProperties().setHeader("x-delay", time);
                }
                logger.debug("还有【" + message.getMessageProperties().getExpiration() + "】执行任务");
    
                return message;
            });
        }
    
        /**
         * 修改延时任务
         *
         * @param executerName 执行器
         * @param param        执行参数
         * @param triggerTime  执行时间
         * @param uniqueKey    添加任务时的唯一凭证
         */
        @Override
        public void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey) {
    
            //标识任务放弃
            cache.remove(RabbitmqTriggerUtil.generate(executerName, oldTriggerTime, uniqueKey));
            //重新添加任务
            this.add(executerName, param, triggerTime, uniqueKey);
        }
    
        /**
         * 删除延时任务
         *
         * @param executerName 执行器
         * @param triggerTime  执行时间
         * @param uniqueKey    添加任务时的唯一凭证
         */
        @Override
        public void delete(String executerName, Long triggerTime, String uniqueKey) {
            cache.remove(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    3)其中一个生产者的具体实例

    /**
     * 拼团业务类
     */
    @Service
    public class PintuanManagerImpl implements PintuanManager {
    
        @Autowired
        private PintuanMapper pintuanMapper;
        ...
        
    //只记录消息生产部分代码
    	@Override
        @Transactional(value = "tradeTransactionManager", propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
        public Pintuan add(Pintuan pintuan) {
            //检测开始时间和结束时间
            PromotionValid.paramValid(pintuan.getStartTime(), pintuan.getEndTime(), 1, null);
            this.verifyParam(pintuan.getStartTime(), pintuan.getEndTime());
            pintuan.setStatus(PromotionStatusEnum.WAIT.name());
            pintuan.setSellerName(UserContext.getSeller().getSellerName());
            pintuan.setCreateTime(DateUtil.getDateline());
            pintuan.setSellerId(UserContext.getSeller().getSellerId());
            //可操作状态为nothing,代表活动不可以执行任何操作
            pintuan.setOptionStatus(PintuanOptionEnum.NOTHING.name());
            this.pintuanMapper.insert(pintuan);
    
            //创建活动 启用延时任务,活动开始时启动拼团
            PintuanChangeMsg pintuanChangeMsg = new PintuanChangeMsg();
            pintuanChangeMsg.setPintuanId(pintuan.getPromotionId());
            pintuanChangeMsg.setOptionType(1);
            timeTrigger.add(TimeExecute.PINTUAN_EXECUTER, pintuanChangeMsg, pintuan.getStartTime(), TRIGGER_PREFIX + pintuan.getPromotionId());
    
            return pintuan;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    4、应用实例——消费者

    1)消费者

    /**
     * 延时任务 消息消费者
     */
    @Component
    public class TimeTriggerConsumer {
    
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
    
        @Autowired
        private Cache cache;
    
        /**
         * 接收消息,监听 CONSUMPTION_QUEUE 队列
         */
        @RabbitListener(queues = TimeTriggerConfig.IMMEDIATE_QUEUE_XDELAY)
        public void consume(TimeTriggerMsg timeTriggerMsg) {
    
            try {
                String key = RabbitmqTriggerUtil.generate(timeTriggerMsg.getTriggerExecuter(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
    
                //如果这个任务被标识不执行
                if (cache.get(key) == null) {
    
                    logger.debug("执行器执行被取消:" + timeTriggerMsg.getTriggerExecuter() + "|任务标识:" + timeTriggerMsg.getUniqueKey());
                    return;
                }
                logger.debug("执行器执行:" + timeTriggerMsg.getTriggerExecuter());
                logger.debug("执行器参数:" + JsonUtil.objectToJson(timeTriggerMsg.getParam()));
    
                //执行任务前 清除标识
                cache.remove(key);
    
                TimeTriggerExecuter timeTriggerExecuter = (TimeTriggerExecuter) ApplicationContextHolder.getBean(timeTriggerMsg.getTriggerExecuter());
                timeTriggerExecuter.execute(timeTriggerMsg.getParam());
    
            } catch (Exception e) {
                logger.error("延时任务异常:", e);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    2)消费者调用接口

    /**
     * 延时任务执行器接口
     */
    public interface TimeTriggerExecuter {
    
    
        /**
         * 执行任务
         * @param object 任务参数
         */
        void execute(Object object);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    接口高可用、解耦合,有多个实现,用以支持不同的模块使用延时队列功能
    在这里插入图片描述

    3)其中一个实现的具体操作

    第一步都是将泛型类转成一个具体的、特定的实体类对象

    /**
     * 拼团定时开启关闭活动 延时任务执行器
     */
    @Component("pintuanTimeTriggerExecute")
    public class PintuanTimeTriggerExecuter implements TimeTriggerExecuter {
    
        @Autowired
        private TimeTrigger timeTrigger;
    
        @Autowired
        private PintuanClient pintuanClient;
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * 执行任务
         */
        @Override
        public void execute(Object object) {
            PintuanChangeMsg pintuanChangeMsg = (PintuanChangeMsg) object;
    
            //如果是要开启活动
            if (pintuanChangeMsg.getOptionType() == 1) {
                Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId());
                if (PromotionStatusEnum.WAIT.name().equals(pintuan.getStatus()) ||
                        (PromotionStatusEnum.END.name().equals(pintuan.getStatus()) && PintuanOptionEnum.CAN_OPEN.name().equals(pintuan.getOptionStatus()))) {
                    pintuanClient.openPromotion(pintuanChangeMsg.getPintuanId());
                    //开启活动后,立马设置一个关闭的流程
                    pintuanChangeMsg.setOptionType(0);
                    timeTrigger.add(TimeExecute.PINTUAN_EXECUTER, pintuanChangeMsg, pintuan.getEndTime(), "{TIME_TRIGGER}_" + pintuan.getPromotionId());
                    this.logger.debug("活动[" + pintuan.getPromotionName() + "]开始,id=[" + pintuan.getPromotionId() + "]");
                }
            } else {
                //拼团活动结束
                Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId());
                if (pintuan.getStatus().equals(PromotionStatusEnum.UNDERWAY.name())) {
                    pintuanClient.closePromotion(pintuanChangeMsg.getPintuanId());
                }
                this.logger.debug("活动[" + pintuan.getPromotionName() + "]结束,id=[" + pintuan.getPromotionId() + "]");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    系统架构师备考倒计时24天(每日知识点)
    可转债长期持有策略——收益与风险、利息收入、案例研究
    C/C++面试高频知识点八股文
    基于MATLAB的GPS卫星绕地运行轨迹动态模拟仿真
    烟花爆竹厂如何做到0风险0爆炸事故?AI+视频监控平台给出答案
    项目经理制定项目计划的大作用:明确目标、步骤和监控
    springboot 启动原理、启动过程、启动机制的介绍
    轻松读懂spring之 IOC的主干流程
    计算机网络(五)
    ES / Kibana 快速安装配置记录
  • 原文地址:https://blog.csdn.net/Derek7117/article/details/127885918