• redis实现延迟队列


    redis延迟队列参考博客:https://blog.csdn.net/m0_37975854/article/details/119836978
    jkd延迟队列参考博客:
    https://www.jianshu.com/p/a1b3aa87f78b

    一、实现延迟队列,a、可以使用mq的死信队列,b、可以使用 redis里面zset的延迟队列 c、可以使用jkd里面的延迟队列

    二、zset实现延迟队列(跟sadd不同的就是有score)

    在这里插入图片描述

    zset参考博客:
    https://blog.csdn.net/cm15835106905/article/details/126323705

    三、实现代码

    3.1、参考博客:

    https://blog.csdn.net/m0_37975854/article/details/119836978

    3.2、通过这篇博客学到了什么

    1. 接口定义规则,多个实现类。规定只能由这个接口实现的对象的class文件才能调用的方法,这么写:Class clazz, ? extends代表某个继承了RedisDelayedQueueListener接口的对象
    2. 如果想要拿到某个接口的全部实现类,使用:
      在这里插入图片描述
    3. 继承ApplicationContextAware就是为了获取ApplicationContext(上下文),
      也可以使用注入的方式获取ApplicationContext;继承ApplicationContextAware则在项目启动的时候就执行setApplicationContext方法
      在这里插入图片描述

    3.3、代码实现

    接口:

    package com.yj.redis;
    
    /**
     * 队列事件监听接口,需要实现这个方法
     * @param 
     */
    public interface RedisDelayedQueueListener<T> {
        /**
         * 执行方法
         *
         * @param t
         */
        void invoke(T t);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    实现类:

    package com.yj.redis;
    
    import com.yj.vo.customer.CustomerManagerVO;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * redis延迟队列,数据过期处理类
     */
    @Component
    @Slf4j
    public class PayQCordListener implements RedisDelayedQueueListener<CustomerManagerVO> {
     
        @Override
        public void invoke(CustomerManagerVO payStateReqVO) {
            log.info("延迟时效时间:{}", new Date());
            log.info("延迟失效,内容:{}", payStateReqVO);
             //处理业务
            log.info("延迟失效,内容:{},处理结果:{}", payStateReqVO,"测试处理");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    创建redis中的zset和score(相当于创建延迟队列):

    package com.yj.redis;
    
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 创建延迟信息
     */
    @Component
    @Slf4j
    public class RedisDelayedQueue {
     
    
        @Resource(name = "getRedisson")
        private  RedissonClient getRedisson;
    
        /**
         * 添加队列
         *
         * @param t        DTO传输类
         * @param delay    时间数量
         * @param timeUnit 时间单位
         * @param       泛型
         */
        private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
            log.info("开始延迟队列时间:{}", new Date());
            log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,t);
            RBlockingQueue<T> blockingFairQueue = getRedisson.getBlockingQueue(queueName);
            RDelayedQueue<T> delayedQueue = getRedisson.getDelayedQueue(blockingFairQueue);
            delayedQueue.offer(t, delay, timeUnit);
        }
     
        /**
         * 添加队列-秒
         *
         * @param t     DTO传输类
         * @param delay 时间数量
         * @param    泛型
         */
        // 为什么返回的时候使用
        public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
            addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
        }
     
        /**
         * 添加队列-分
         *
         * @param t     DTO传输类
         * @param delay 时间数量
         * @param    泛型
         */
        public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
            addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
        }
     
        /**
         * 添加队列-时
         *
         * @param t     DTO传输类
         * @param delay 时间数量
         * @param    泛型
         */
        public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
            addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
        }
        /**
         * 添加队列-天
         *
         * @param t     DTO传输类
         * @param delay 时间数量
         * @param    泛型
         */
        public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
            addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    监听redis中数据是否过期,如果过期则进行消费

    package com.yj.redis;
    
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.Map;
    
    /**
     * 初始化队列监听
     */
    @Component
    @Slf4j
    public class RedisDelayedQueueInit implements ApplicationContextAware {
     
    
        @Resource(name = "getRedisson")
        private RedissonClient getRedisson;
    
        /**
         * 获取应用上下文并获取相应的接口实现类
         *
         * @param applicationContext
         * @throws BeansException
         */
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
            for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
                String listenerName = taskEventListenerEntry.getValue().getClass().getName();
                startThread(listenerName, taskEventListenerEntry.getValue());
            }
        }
     
        /**
         * 启动线程获取队列*
         *
         * @param queueName                 queueName
         * @param redisDelayedQueueListener 任务回调监听
         * @param                        泛型
         * @return
         */
        private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
            RBlockingQueue<T> blockingFairQueue = getRedisson.getBlockingQueue(queueName);
            //服务重启后,无offer,take不到信息。
            getRedisson.getDelayedQueue(blockingFairQueue);
            //由于此线程需要常驻,可以新建线程,不用交给线程池管理
            Thread thread = new Thread(() -> {
                log.info("启动监听队列线程" + queueName);
                while (true) {
                    try {
                        T t = blockingFairQueue.take();
                        log.info("监听队列线程,监听名称:{},内容:{}", queueName, t);
                        redisDelayedQueueListener.invoke(t);
                    } catch (Exception e) {
                        log.info("监听队列线程错误,", e);
                    }
                }
            });
            thread.setName(queueName);
            thread.start();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
  • 相关阅读:
    符合 EN55022B 规格、LTM4613EY、LTM4613MPV直流µModule稳压器【RG500Q 5G Sub-6 GHz 模块】
    el-table报错“Cannot read properties of undefined (reading ‘style‘)”解决
    AR Engine运动跟踪能力,高精度实现沉浸式AR体验
    Eureka注册中心
    【数据库】之MYSQL基本语法
    与分类有关的一种时序优先现象
    踩了大坑:wordpress后台 无法将上传的文件移动至wp-content
    Java 递归遍历文件所有目录(案例)
    序列号读取
    【测试开发】Mq消息重复如何测试?
  • 原文地址:https://blog.csdn.net/M1275601161/article/details/127801705