• 延时队列DelayQueue的使用


    DelayQueue的简单应用

    代码块

    Message.java (延时队列定义)

    @Data
    public class Message implements Delayed {
    
        private Map<String,Object> msgBody=new HashMap<>();  //消息内容
        private long excuteTime;//执行时间
        private String type;
    
        public Message(long delayTime,String type) {
            this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
            this.type=type;
        }
        @Override
        public int compareTo(Delayed delayed) {
            long d = (getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    DelayQueueManager.java (延时队列创建)

    public class DelayQueueManager {
    
        private static DelayQueue<Message> delayQueue=null;
    
        static {
            // 创建延时队列
            delayQueue = new DelayQueue<Message>();
        }
    
        public static DelayQueue<Message> getDelayQueue(){
            return delayQueue;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Consumer.java (多线程消费者)

    public class Consumer implements Runnable{
    
        private DelayQueue<Message> queue;
        public static boolean  isRun=false;
    
        public Consumer(DelayQueue<Message> queue){this.queue=queue;}
    
        @SneakyThrows
        @Override
        public void run() {
            isRun=true;
            while (true) {
                Message take = queue.take();
                System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->start");
                Map<String,Object> map=take.getMsgBody();
                System.out.println("消息内容: "+map.toString());
                System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->end");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Demo (生产者)

    public class Demo {
    
        public static void main(String[] args) {
            // 添加延时消息,msg 延时5s= 5*1000
            Message msg = new Message( 5000,"延时任务1");
            msg.getMsgBody().put("name","大黄");
            msg.getMsgBody().put("color","黄");
            msg.getMsgBody().put("age",2);
            msg.getMsgBody().put("food","鸡腿");
            DelayQueueManager.getDelayQueue().offer(msg);
            if(!Consumer.isRun){
                // 启动消费线程
                new Thread(new Consumer(DelayQueueManager.getDelayQueue())).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    结果

    消息类型:延时任务1-->11:08:29.557-->start
    消息内容: {color=, name=大黄, age=2, food=鸡腿}
    消息类型:延时任务1-->11:08:29.558-->end
    
    
    • 1
    • 2
    • 3
    • 4

    DelayQueue在Spring中的应用

    注意:
    生产者在使用延时队列的时候需要将消息内容通过Map或者实体类传进去,我觉得用Map好一点方便扩充消息内容。

    map和实体类之间相互转换的依赖

            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.54version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    JSON.parseObject(JSON.toJSONString(源数据), 转换后数据类型.class);
    // 将 Map 转换为 实体类
    User user = JSON.parseObject(JSON.toJSONString(user01), User.class);
    // 将 实体类 转换为 Map
    Map map = JSON.parseObject(JSON.toJSONString(user), Map.class);

    代码

    实体类DelayDog

    @Data
    public class DelayDog {
    
        private String name ;
        private String color ;
        private Integer age ;
        private String food ;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    延时任务最终消费者–DelayService.interface

    public interface DelayService {
    
        public void dogRun(DelayDog delayDog);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    延时任务最终消费者–DelayServiceImpl.java

    @Service
    public class DelayServiceImpl implements DelayService {
        @Override
        public void dogRun(DelayDog delayDog) {
            System.out.println("限时任务开始执行!");
            System.out.println(delayDog.getName()+"喜欢吃"+delayDog.getFood());
            System.out.println("限时任务结束!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注意: 多线程为了线程安全会防止注入,因此在想使用service业务类时,需要使用ApplicationContext的方式获取bean的方法获取service类。不然则会导致我们的消费者在run方法里面调用service层业务时会报空指针异常。这是由于spring注入的业务类为null,或者直接new的业务对象也为null。

    我们需要获取ApplicationContext的类要实现ApplicationContextAware接口,如下:

    ApplicationContextUtil

    @Component
    public class ApplicationContextUtil implements ApplicationContextAware {
    
        private static ApplicationContext context;
        @Override
        public void setApplicationContext(ApplicationContext context) throws BeansException {
            this.context = context;
        }
        public static ApplicationContext getContext() {
            return context;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Consumer.java (多线程消费者)

    public class Consumer implements Runnable{
    
        DelayService delayService= ApplicationContextUtil.getContext().getBean(DelayService.class);
    
        private DelayQueue<Message> queue;
        public static boolean  isRun=false;
    
        public Consumer(DelayQueue<Message> queue){this.queue=queue;}
    
        @SneakyThrows
        @Override
        public void run() {
            isRun=true;
            while (true) {
                Message take = queue.take();
                System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->start");
                Map<String,Object> map=take.getMsgBody();
                System.out.println("消息内容: "+map.toString());
                DelayDog delayDog = JSON.parseObject(JSON.toJSONString(map), DelayDog.class);
                delayService.dogRun(delayDog);
                System.out.println("消息类型:" + take.getType()+"-->"+ LocalTime.now()+"-->end");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    延时任务生产者–DemoDelayService.interface

    public interface DemoDelayService {
    
        public void demoDelay();
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    延时任务生产者–DemoDelayServiceImpl.java

    @Service
    public class DemoDelayServiceImpl implements DemoDelayService{
        @Override
        public void demoDelay() {
    
            DelayDog delayDog = new DelayDog();
            delayDog.setName("小黑");
            delayDog.setColor("黑色");
            delayDog.setAge(3);
            delayDog.setFood("鸭腿");
    
    //        HashMap msgMap = new HashMap<>();
    //        BeanUtils.copyProperties(delayDog,msgMap);
    
            HashMap<String, Object> msgMap = JSON.parseObject(JSON.toJSONString(delayDog), HashMap.class);
    
            System.out.println("map内容: "+msgMap.toString());
            Integer delayTime=5000;
            // 添加延时消息,msg 延时5s= 5*1000
            Message msg = new Message( delayTime,"延时任务1");
            msg.setMsgBody(msgMap);
    
            DelayQueueManager.getDelayQueue().offer(msg);
            if(!Consumer.isRun){
                // 启动消费线程
                System.out.println("延时任务加入队列 "+ LocalTime.now());
                new Thread(new Consumer(DelayQueueManager.getDelayQueue())).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    结果

    map内容: {color=黑色, name=小黑, age=3, food=鸭腿}
    延时任务加入队列 12:11:09.644
    消息类型:延时任务1-->12:08:03.581-->start
    消息内容: {color=黑色, name=小黑, age=3, food=鸭腿}
    限时任务开始执行!
    小黑喜欢吃鸭腿
    限时任务结束!
    消息类型:延时任务1-->12:08:03.585-->end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    ASP.NET Core如何知道一个请求执行了哪些中间件?
    【数据科学】Keras[Keras、数据、模型架构、预处理、审视模型、编译模型、模型训练、评估模型性能、预测、保存/加载模型、模型微调]
    docker commit 的简单使用
    java毕业设计车险销售管理系统mybatis+源码+调试部署+系统+数据库+lw
    Python3实用安装教程
    Python科学计算与可视化 C1
    @hook扩展分析
    Hive之内部表外部表和分区表分桶表
    电子科技大学《数据库原理及应用》(持续更新)
    Bigemap 在生态环境督察工作中的应用
  • 原文地址:https://blog.csdn.net/m0_46636892/article/details/127738573