• 【定时功能】消息的定时发送-基于RocketMQ


    一、功能介绍

      要实现一个消息的定时发送功能,也就是让消息可以在某一天某一个时间具体节点进行发送。而我们公司的业务场景是类似短信的业务,而且数量不小,用户会进行号码、消息内容、定时发送时间等信息的提交。等到了设定的定时时间,则进行消息的发送工作。

     

    二、思考实现逻辑

      前提准备:

        MySQL

        RocketMQ,最好broker开启队列自动创建的配置

     

      刚开始我想的是基于MySQL去实现定时发送,后来觉得这种扫描方式单线程的时候并发能力不够,多线程也得需要做并发控制,还得做一些任务的调度,比如线程执行一半卡死或者异常的时候,还得做任务的补偿工作。所以,后面我选择了通过组件的方式实现,这是我想到了消息队列的延迟发送功能,刚好我司系统用到了RocketMQ,我就想通过该组件实现该消息的定时发送功能。RocketMQ的文件默认只保存72小时,这个要注意一下,有需要调整时间的需要调整一下。我也不建议调整的太大,这样容易引起系统的资源浪费。

      具体实现逻辑如下:

      1)用户创建消息,记录到数据库(号码,内容,定时发送时间)

      2)如果是是可以立即发送的消息,发送消息到RocketMQ的立即发送主题(TOPIC_NOW_SEND)中,后续有线程消费该消息,直接进行数据发送。

      3)如果是需要定时发送的消息,发送消息到RocketMQ的延迟发送主题(TOPIC_DELAY_SEND)中。

      这里有一些细节设置:

        tags:可以用于同一主题下区分不同类型的消息。此处可以设定为当前日期(我是用的是YYYYMMDD,例如:20220730),因为设定为需要发送的当天日期后,程序就可以通过该字段分辨是否需要处理。后续消费者就可以通过当天日期筛选需要当天发送的消息,过滤掉不是当天需要发送的消息。

        keys:索引,可以用于查询消息。我们设定的格式可以是【订单号+消息定时发送的时间戳(可以是毫秒级)】,例如:FS20220730123456+16591914490001

      4)启动两个(或者多个,这个和文件保存的时间有关系)消费者进行轮换,这里我称之为masterConsumer(启动时负责消费前一天的数据)和slaveConsumer(启动时负责消费当天的数据)。这里我设计的初衷是每天有自己的消费者,例如20220730消费tags=20220730的数据,20220731消费tags=20220731的数据。这里需要消费前一天的数据原因是,需要防止出现晚上23:59分提交消息的时候,当天没处理完。所以,需要在第二天进行额外的补偿操作。

      程序启动第1天(例如当天是20220730):

        masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220729
        slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220730

      程序启动第2天(例如当天是20220731):
        slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220730
        masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220731

      程序启动第3天(例如当天是20220801):
        masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220731
        slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220801

      程序启动第4天(例如当天是20220802):
        slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220801
        masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消费tags=20220802

      ......

      程序的masterConsumer和slaveConsumer就这样持续轮换消费数据。

      5)消费者的消费数据逻辑:

      消费者需要设置consumeFromWhere参数=CONSUME_FROM_FIRST_OFFSET(默认是ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,从最新的位置开始消费),这样数据才可以第一次启动从队列的最前位置开始消费。

      消费到数据之后,获取keys的内容,根据之前发送的设定获取消息定时发送的时间戳,然后和当前的时间进行比对。这里要比对的主要原因是当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。

      当前支持的消息延迟级别有:

        private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

      如果比对时间<=1秒,则直接发送到RocketMQ的立即发送主题(TOPIC_NOW_SEND)中,后续有线程进行消费发送;否则,发送时间根据RocketMQhi吃的延迟级别进行选择,按照最大可支持的时间为准,等待后续的消息重新消费,直到最终消息符合条件,可以投递到RocketMQ的立即发送主题(TOPIC_NOW_SEND)为止。例如:如果比对时间>2h,则重新投递到延迟发送主题(TOPIC_DELAY_SEND)中,延迟发送的等级为18(也就是2h)。如果比对时间>2m,比对时间<3m,则重新投递到延迟发送主题(TOPIC_DELAY_SEND)中,延迟发送的等级为6(也就是2m)。

      6)在第4步骤中的masterConsumer和slaveConsumer的轮换逻辑支持,即在过了0点之后,前一天的consumer需要进行重置。

      

      以上就是我的实现逻辑。

     

    三、敲代码

      1、定义延迟级别与时间的对应关系

    复制代码
    package cn.lxw.mq.constant;
    
    import java.util.Arrays;
    
    public enum MessageDelayLevel {
        SECOND_1(1, 1 * 1000L),
        SECOND_5(2, 5 * 1000L),
        SECOND_10(3, 10 * 1000L),
        SECOND_30(4, 30 * 1000L),
        MINUTE_1(5, 1 * 60 * 1000L),
        MINUTE_2(6, 2 * 60 * 1000L),
        MINUTE_3(7, 3 * 60 * 1000L),
        MINUTE_4(8, 4 * 60 * 1000L),
        MINUTE_5(9, 5 * 60 * 1000L),
        MINUTE_6(10, 6 * 60 * 1000L),
        MINUTE_7(11, 7 * 60 * 1000L),
        MINUTE_8(12, 8 * 60 * 1000L),
        MINUTE_9(13, 9 * 60 * 1000L),
        MINUTE_10(14, 10 * 60 * 1000L),
        MINUTE_20(15, 20 * 60 * 1000L),
        MINUTE_30(16, 30 * 60 * 1000L),
        HOUR_1(17, 1 * 60 * 60 * 1000L),
        HOUR_2(18, 2 * 60 * 60 * 1000L),
        ;
        //    1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        private Integer level;
        private Long mills;
    
        MessageDelayLevel(Integer level, Long mills) {
            this.level = level;
            this.mills = mills;
        }
    
        public Integer getLevel() {
            return level;
        }
    
        public void setLevel(Integer level) {
            this.level = level;
        }
    
        public Long getMills() {
            return mills;
        }
    
        public void setMills(Long mills) {
            this.mills = mills;
        }
    
        /**
         * 功能描述: 
    * 〈计算需等待的时间,最长2小时〉 * @Param: [timeMills] * @Return: {
    @link MessageDelayLevel} * @Author: luoxw * @Date: 2022/7/26 16:33 */ public static MessageDelayLevel getMaxLevel(long timeMills){ long millsBwt = timeMills - System.currentTimeMillis(); if(millsBwt < 1000L){ millsBwt = 1000L; } final long paramMills = millsBwt; return Arrays.asList(MessageDelayLevel.values()).stream().filter(p -> p.mills.compareTo(paramMills) <= 0).max((c1,c2) -> c1.getLevel().compareTo(c2.getLevel())).orElse(HOUR_2); } public static void main(String[] args) { getMaxLevel(2 * 60 * 60 * 1000L); getMaxLevel(2 * 60 * 60 * 1000L + 1); getMaxLevel(2 * 60 * 60 * 1000L - 1); getMaxLevel( 6 * 1000L); getMaxLevel( 5 * 1000L); getMaxLevel( System.currentTimeMillis()); } }
    复制代码

      2、延迟消息消费逻辑

    复制代码
    package cn.lxw.task;
    
    import cn.hutool.core.thread.ThreadUtil;
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import cn.lxw.Consumer;
    import cn.lxw.DateUtil;
    import cn.lxw.constant.LogConst;
    import cn.lxw.context.AppEnvContext;
    import cn.lxw.enums.EnumMQTopic;
    import cn.lxw.mq.constant.MessageDelayLevel;
    import cn.lxw.util.ProducerUtil;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    public class Send_DelayMessageTask implements Runnable {
        private DefaultMQProducer defaultSendProducer;
        private DefaultMQProducer defaultDelayProducer;
    
    
        private static volatile String currentDate = DateUtil.operateDate(new Date(), 0);
        private DefaultMQPushConsumer masterDelayConsumer;
        private static volatile String lastDate = DateUtil.operateDate(new Date(), -1);
        private DefaultMQPushConsumer slaveDelayConsumer;
        // 2个消费者的时候,奇数更新master消费者,偶数更新slave消费者
        private static volatile AtomicInteger closeCnt = new AtomicInteger();
    
      
        public Send_DelayMessageTask() {
            try {
                String rocketMQAddr = AppEnvContext.getPropValue("rocketmq.address");
                defaultSendProducer = ProducerUtil.init(rocketMQAddr, "PRODUCER_TOPIC_NOW_SEND");
                defaultDelayProducer = ProducerUtil.init(rocketMQAddr, "PRODUCER_TOPIC_DELAY_SEND");
                // 主从消费者交替进行数据消费。
                // 初始化的时候,主消费者消费昨天(1月1日)的数据,从消费者消费今天(1月2日)的数据。
                int consumeThreadMin = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMin", 10);
                int consumeThreadMax = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMax", 10);
                int consumeMessageBatchMaxSize = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeMessageBatchMaxSize", 10);
                masterDelayConsumer = Consumer.getInstance(
                        "TOPIC_DELAY_SEND",
                        "MASTER_CONSUMER_TOPIC_DELAY_SEND",
                        rocketMQAddr,
                        lastDate,
                        consumeThreadMin,
                        consumeThreadMax,
                        consumeMessageBatchMaxSize
                );
                slaveDelayConsumer = Consumer.getInstance(
                        "TOPIC_DELAY_SEND",
                        "SLAVE_CONSUMER_TOPIC_DELAY_SEND",
                        rocketMQAddr,
                        currentDate,
                        consumeThreadMin,
                        consumeThreadMax,
                        consumeMessageBatchMaxSize
                );
            } catch (Exception e) {
                log.error(LogConst.PREFIX + "初始化异常", e);
            }
        }
    
        // 消费者重启
        private void restartConsumer(DefaultMQPushConsumer consumer, String consumerName) throws Exception {
            String rocketMQAddr = AppEnvContext.getPropValue("rocketmq.address");
            consumer.shutdown();
            int consumeThreadMin = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMin", 10);
            int consumeThreadMax = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMax", 10);
            int consumeMessageBatchMaxSize = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeMessageBatchMaxSize", 10);
            consumer = Consumer.getInstance(
                    "TOPIC_DELAY_SEND",
                    consumerName,
                    rocketMQAddr,
                    currentDate,
                    consumeThreadMin,
                    consumeThreadMax,
                    consumeMessageBatchMaxSize
            );
            MessageListenerConcurrently listener = getDelayListener();
            consumer.registerMessageListener(listener);
            consumer.start();
            closeCnt.incrementAndGet();
        }
    
        // 重新初始化消费者
        private synchronized void reInitComsumer(){
            try {
                String thisDate = DateUtil.operateDate(new Date(), 0);
                int lastCloseCnt = closeCnt.get();
                // 初始化的时候不执行 and 时间没跨度的时候不执行,只有当第二天的时候才执行数据
                if(lastCloseCnt > 0 && !thisDate.equals(currentDate)){
                    // 获取今天的时间
                    currentDate = thisDate;
                    // 如果时间对2取余的值为1,则更新主消费者,之前的昨天(1月1日)更新为第二天(1月3日)的数据
                    if(closeCnt.get() % 2 == 1) {
                        restartConsumer(masterDelayConsumer, "MASTER_CONSUMER_TOPIC_DELAY_SEND");
                    }
                    // 如果时间对2取余的值为0,则更新从消费者,之前的今天(1月2日)更新为第三天(1月4日)的数据
                    if(closeCnt.get() % 2 == 0){
                        restartConsumer(slaveDelayConsumer, "SLAVE_CONSUMER_TOPIC_DELAY_SEND");
                    }
                }
            } catch (Exception e) {
                log.error(LogConst.PREFIX + "消费者重新初始化异常", e);
            }
        }
        
        // 消费逻辑
        private MessageListenerConcurrently getDelayListener(){
            MessageListenerConcurrently listener = (List list, ConsumeConcurrentlyContext context) -> {
                for (MessageExt msg : list) {
                    String msgBody = null;
                    try {
                        msgBody = new String(msg.getBody());
                        String msgKeys = msg.getKeys();
                        // 订单号+发送时间戳
                        String[] split = msgKeys.split("\\+");
                        if(split.length == 2) {
                            String delaySendTimeStr = split[1];
                            long time = DateUtil.parseStr2TimeMills(delaySendTimeStr);
                            // 时间比对,获取最大支持的延迟级别
                            MessageDelayLevel maxLevel = MessageDelayLevel.getMaxLevel(time);
                            Integer level = maxLevel.getLevel();
                            // 如果延迟级别=1,也就是小于等于1秒的时候,直接发送到立即发送主题(TOPIC_NOW_SEND)
                            if (MessageDelayLevel.SECOND_1.getLevel().equals(level)) {
                                SendResult sendResult = ProducerUtil.syncSend(
                                        defaultSendProducer,
                                        "TOPIC_NOW_SEND",
                                        msg.getTags(),
                                        msg.getKeys(),
                                        msgBody
                                );
                                log.info(LogConst.PREFIX + "定时消息[{}]发送到[立即发送主题]结果:{}", msgBody, sendResult);
                            } else {
                                SendResult sendResult = ProducerUtil.syncSendDelay(
                                        defaultDelayProducer,
                                        "TOPIC_DELAY_SEND",
                                        msg.getTags(),
                                        msg.getKeys(),
                                        level,
                                        msgBody
                                );
                                log.info(LogConst.PREFIX + "定时消息[{}]发送到[延迟发送主题]结果:{}", msgBody, sendResult);
                            }
                        }
                    } catch (Exception ex) {
                        log.error(LogConst.PREFIX + "定时消息[{}]发送异常", msgBody, ex);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            };
            return listener;
        }
    
        @Override
        public void run() {
            try {
                MessageListenerConcurrently masterListener = getDelayListener();
                masterDelayConsumer.registerMessageListener(masterListener);
                MessageListenerConcurrently slaveListener = getDelayListener();
                slaveDelayConsumer.registerMessageListener(slaveListener);
                // 启动消费者
                masterDelayConsumer.start();
                slaveDelayConsumer.start();
            }catch (Exception ex){
                log.error(LogConst.PREFIX + "定时消息消费程序启动异常", ex);
            }
    
            while (true){
                // 一分钟检查一次是否需要轮换消费者
                ThreadUtil.sleep(60 * 1000);
                reInitComsumer();
            }
        }
    }
    复制代码

      3、其余的逻辑就不展示了,可以根据自己实际业务情况进行处理。

     

    四、总结

      这是我实现的一个方式,其实是不太完美的,因为量大的时候性能也是有问题的,因为根据RocketMQ文件保存的时间(默认72小时)因素决定。如果保存的时间太长,消息太多,而我们的程序第一次启动从队列的最前位置开始消费,此时的消息数据是巨大的。不管是对RocketMQ组件,还是对我们程序都是一个压力,而且目前的消息过滤功能只能在客户端进行处理。所以,这里我建议的是一天一个主题,以定时发送的当天进行发送。例如:20220730需要发送的数据用TOPIC_DELAY_SEND_20220730主题;20220731需要发送的数据用TOPIC_DELAY_SEND_20220731主题。这样的话,数据消费的时候也不需要通过tags进行过滤了,只需要通过keys的时间戳数据进行判断,消费逻辑大致和文中写法一样,只需要调整延迟发送的主题名为【TOPIC_DELAY_SEND_YYYYMMDD】,发送和轮换的延迟发送主题也需要根据发送时间进行调整。

      以上思路仅供各位参考使用,有更好的实现方式可以在下方进行留言。谢谢大家的观看!衷心感谢!

     

  • 相关阅读:
    【WSN】无线传感器网络 X-Y 坐标到图形视图和位字符串前缀嵌入方法研究(Matlab代码实现)
    【图形学】28 更多的透明等式和参数
    vue + html + Lodop打印功能
    前端页面左右布局,点击左div增加动画过渡效果
    LVS+Keepalived+nfs 集群部署及实验
    Java:Spring Boot整合mybatis-plus示例
    机器人动力学与参数辨识学习笔记(一)
    双因子身份认证如何保障 Windows 系统登录安全?
    深度学习入门(二十八)卷积神经网络——AlexNet
    kubernetes集群编排(6)
  • 原文地址:https://www.cnblogs.com/luozhuzhu/p/16536197.html