• 盘点JAVA中延时任务的几种实现方式


    场景描述

    ①需要实现一个定时发布系统通告的功能,如何实现? ②支付超时,订单自动取消,如何实现?

    实现方式

    一、挂起线程

    推荐指数:★★☆ 优点: JDK原生(JUC包下)支持,无需引入新的依赖; 缺点: (1)基于内存,应用重启(或宕机)会导致任务丢失 (2)基于内存挂起线程实现延时,不支持集群 (3)代码耦合性大,不易维护 (4)一个任务就要新建一个线程绑定任务的执行,容易造成资源浪费

    ①配置延迟任务专用线程池

    1. /**
    2. * 线程池配置
    3. */
    4. @Configuration
    5. @EnableAsync
    6. @EnableConfigurationProperties(ThreadPoolProperties.class)
    7. public class ThreadPoolConfig {
    8. //ThreadPoolProperties的配置依据需求和服务器配置自行配置
    9. @Resource
    10. private ThreadPoolProperties threadPoolProperties;
    11. //延迟任务队列容量
    12. private final static int DELAY_TASK_QUEUE_CAPACITY = 100;
    13. @Bean
    14. public ThreadPoolTaskExecutor delayTaskExecutor() {
    15. log.info("start delayTaskExecutor");
    16. ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
    17. //配置核心线程数
    18. threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
    19. //配置最大线程数
    20. threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
    21. //配置队列大小
    22. threadPool.setQueueCapacity(DELAY_TASK_QUEUE_CAPACITY);
    23. //线程最大存活时间
    24. threadPool.setKeepAliveSeconds (threadPoolProperties.getKeepAliveSeconds());
    25. //配置线程池中的线程的名称前缀
    26. threadPool.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
    27. // rejection-policy:当pool已经达到max size的时候执行的策略
    28. threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    29. //执行初始化
    30. threadPool.initialize();
    31. return threadPool;
    32. }
    33. }
    34. 复制代码

    ②创建延时任务

    在需要执行的代码块创建延时任务

    1. delayTaskExecutor.execute(() -> {
    2. try {
    3. //线程挂起指定时间
    4. TimeUnit.MINUTES.sleep(time);
    5. //执行业务逻辑
    6. doSomething();
    7. } catch (InterruptedException e) {
    8. log.error("线程被打断,执行业务逻辑失败");
    9. }
    10. });
    11. 复制代码

    二、ScheduledExecutorService 延迟任务线程池

    推荐指数:★★★ 优点: 代码简洁,JDK原生支持 缺点: (1)基于内存,应用重启(或宕机)会导致任务丢失 (2)基于内存存放任务,不支持集群 (3)一个任务就要新建一个线程绑定任务的执行,容易造成资源浪费

    1. class Task implements Runnable{
    2. @Override
    3. public void run() {
    4. System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
    5. System.out.println("scheduledExecutorService====>>>延时器");
    6. }
    7. }
    8. public class ScheduleServiceTest {
    9. public static void main(String[] args) {
    10. ScheduledExecutorService scheduledExecutorService=new ScheduledThreadPoolExecutor(10);
    11. scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
    12. scheduledExecutorService.schedule(new Task(),2, TimeUnit.SECONDS);
    13. scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
    14. }
    15. }
    16. 复制代码

    三、DelayQueue(延时队列)

    推荐指数:★★★☆ 优点: (1)JDK原生(JUC包下)支持,无需引入新的依赖; (2)可以用一个线程对整个延时队列按序执行; 缺点: (1)基于内存,应用重启(或宕机)会导致任务丢失 (2)基于内存存放队列,不支持集群 (3)依据compareTo方法排列队列,调用take阻塞式的取出第一个任务(不调用则不取出),比较不灵活,会影响时间的准确性

    ①新建一个延时任务

    1. public class DelayTask implements Delayed {
    2. private Integer taskId;
    3. private long executeTime;
    4. DelayTask(Integer taskId, long executeTime) {
    5. this.taskId = taskId;
    6. this.executeTime = executeTime;
    7. }
    8. /**
    9. * 该任务的延时时长
    10. * @param unit
    11. * @return
    12. */
    13. @Override
    14. public long getDelay(TimeUnit unit) {
    15. return executeTime - System.currentTimeMillis();
    16. }
    17. @Override
    18. public int compareTo(Delayed o) {
    19. DelayTask t = (DelayTask) o;
    20. if (this.executeTime - t.executeTime <= 0) {
    21. return -1;
    22. } else {
    23. return 1;
    24. }
    25. }
    26. @Override
    27. public String toString() {
    28. return "延时任务{" +
    29. "任务编号=" + taskId +
    30. ", 执行时间=" + new Date(executeTime) +
    31. '}';
    32. }
    33. /**
    34. * 执行具体业务代码
    35. */
    36. public void doTask(){
    37. System.out.println(this+":");
    38. System.out.println("线程ID-"+Thread.currentThread().getId()+":线程名称-"+Thread.currentThread().getName()+":do something!");
    39. }
    40. }
    41. 复制代码

    ②执行延时任务

    1. public class TestDelay {
    2. public static void main(String[] args) throws InterruptedException {
    3. // 新建3个任务,并依次设置超时时间为 30s 10s 60s
    4. DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 3000L);
    5. DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 1000L);
    6. DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 6000L);
    7. DelayQueue<DelayTask> queue = new DelayQueue<>();
    8. queue.add(d1);
    9. queue.add(d2);
    10. queue.add(d3);
    11. System.out.println("开启延时队列时间:" + new Date()+"\n");
    12. // 从延时队列中获取元素
    13. while (!queue.isEmpty()) {
    14. queue.take().doTask();
    15. }
    16. System.out.println("\n任务结束");
    17. }
    18. }
    19. 复制代码

    执行结果:

    四、Redis-为key指定超时时长,并监听失效key

    推荐指数:★★★☆ 优点: 对于有依赖redis的业务且有延时任务的需求,能够快速对接 缺点: (1)客户端断开后重连会导致所有事件丢失 (2)高并发场景下,存在大量的失效key场景会导出失效时间存在延迟 (3)若有多个监听器监听该key,是会重复消费这个过期事件的,需要特定逻辑判断

    ① 修改Redis配置文件并重启Redis

    1. notify-keyspace-events Ex
    2. 复制代码

    注意: redis配置文件不能有空格,否则会启动报错

    ②Java中关于Redis的配置类

    redisTemplate实例bean需要自定义生成; RedisMessageListenerContainer 是redis-key过期监听需要的监听器容器;

    1. @Configuration
    2. @Slf4j
    3. public class RedisConfiguration {
    4. /**
    5. * Redis配置
    6. * @param factory
    7. * @return
    8. */
    9. @Bean(name = "redisTemplate")
    10. public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
    11. RedisTemplate<Object, Object> template = new RedisTemplate<>();
    12. RedisSerializer<String> redisSerializer = new StringRedisSerializer();
    13. template.setConnectionFactory(factory);
    14. //key序列化方式
    15. template.setKeySerializer(redisSerializer);
    16. //value序列化
    17. template.setValueSerializer(redisSerializer);
    18. //value hashmap序列化
    19. template.setHashValueSerializer(redisSerializer);
    20. //key hashmap序列化
    21. template.setHashKeySerializer(redisSerializer);
    22. return template;
    23. }
    24. /**
    25. * 消息监听器容器bean
    26. * @param connectionFactory
    27. * @return
    28. */
    29. @Bean
    30. public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
    31. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    32. container.setConnectionFactory(connectionFactory);
    33. return container;
    34. }
    35. }
    36. 复制代码

    ③监听器代码

    1. @Slf4j
    2. @Component
    3. public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    4. private static final String TEST_REDIS_KEY = "testExpired";
    5. public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer,
    6. RedisTemplate redisTemplate) {
    7. super(listenerContainer);
    8. /**
    9. * 设置一个Redis延迟过期key(key名:testExpired,过期时间:30秒)
    10. */
    11. redisTemplate.opsForValue().set(TEST_REDIS_KEY, "1", 20, TimeUnit.SECONDS);
    12. log.info("设置redis-key");
    13. }
    14. @Override
    15. public void onMessage(Message message, byte[] pattern) {
    16. try {
    17. String expiredKey = message.toString();
    18. if (TEST_REDIS_KEY.equals(expiredKey)) {
    19. //业务处理
    20. log.info(expiredKey + "过期,触发回调");
    21. }
    22. } catch (Exception e) {
    23. log.error("key 过期通知处理异常,{}", e);
    24. }
    25. }
    26. }
    27. 复制代码

    测试结果:

    五、时间轮

    推荐指数:★★★★ 优点: (1)对于大量定时任务,时间轮可以仅用一个工作线程对编排的任务进行顺序运行; (2)自动运行,可以自定义时间轮每轮的tick数,tick间隔,灵活且时间精度可控 缺点: (1)基于内存,应用重启(或宕机)会导致任务丢失 (2)基于内存存放任务,不支持集群

    1. public class WheelTimerTest {
    2. public static void main(String[] args) {
    3. //设置每个格子是 100ms, 总共 256 个格子
    4. HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);
    5. //加入三个任务,依次设置超时时间是 10s 5s 20s
    6. System.out.println("加入一个任务,ID = 1, time= " + LocalDateTime.now());
    7. hashedWheelTimer.newTimeout(timeout -> {
    8. System.out.println(Thread.currentThread().getName());
    9. System.out.println("执行一个任务,ID = 1, time= " + LocalDateTime.now());
    10. }, 10, TimeUnit.SECONDS);
    11. System.out.println("加入一个任务,ID = 2, time= " + LocalDateTime.now());
    12. hashedWheelTimer.newTimeout(timeout -> {
    13. System.out.println(Thread.currentThread().getName());
    14. System.out.println("执行一个任务,ID = 2, time= " + LocalDateTime.now());
    15. }, 5, TimeUnit.SECONDS);
    16. System.out.println("加入一个任务,ID = 3, time= " + LocalDateTime.now());
    17. hashedWheelTimer.newTimeout(timeout -> {
    18. System.out.println(Thread.currentThread().getName());
    19. System.out.println("执行一个任务,ID = 3, time= " + LocalDateTime.now());
    20. }, 20, TimeUnit.SECONDS);
    21. System.out.println("加入一个任务,ID = 4, time= " + LocalDateTime.now());
    22. hashedWheelTimer.newTimeout(timeout -> {
    23. System.out.println(Thread.currentThread().getName());
    24. System.out.println("执行一个任务,ID = 4, time= " + LocalDateTime.now());
    25. }, 20, TimeUnit.SECONDS);
    26. System.out.println("等待任务执行===========");
    27. }
    28. }
    29. 复制代码

    六、消息队列-延迟队列

    针对任务丢失的代价过大,高并发的场景 推荐指数:★★★★ 优点: 支持集群,分布式,高并发场景; 缺点: 引入额外的消息队列,增加项目的部署和维护的复杂度。

    场景:为一个委托指定期限,委托到期后,委托关系终止,相关业务权限移交回原拥有者 这里采用的是RabbitMq的死信队列加TTL消息转化为延迟队列的方式(RabbitMq没有延时队列)

    ①声明一个队列设定其的死信队列

    1. @Configuration
    2. public class MqConfig {
    3. public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal";
    4. public static final String DLX_EXCHANGE_NAME = "dlxExchange";
    5. public static final String AUTH_EXCHANGE_NAME = "authExchange";
    6. public static final String DLX_QUEUE_NAME = "dlxQueue";
    7. public static final String AUTH_QUEUE_NAME = "authQueue";
    8. public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue";
    9. @Bean
    10. @Qualifier(GLOBAL_RABBIT_TEMPLATE)
    11. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    12. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    13. return rabbitTemplate;
    14. }
    15. @Bean
    16. @Qualifier(AUTH_EXCHANGE_NAME)
    17. public Exchange authExchange() {
    18. return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build ();
    19. }
    20. /**
    21. * 死信交换机
    22. * @return
    23. */
    24. @Bean
    25. @Qualifier(DLX_EXCHANGE_NAME)
    26. public Exchange dlxExchange() {
    27. return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build ();
    28. }
    29. /**
    30. * 记录日志的死信队列
    31. * @return
    32. */
    33. @Bean
    34. @Qualifier(DLX_QUEUE_NAME)
    35. public Queue dlxQueue() {
    36. // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    37. return QueueBuilder.durable (DLX_QUEUE_NAME).build ();
    38. }
    39. /**
    40. * 委托授权专用队列
    41. * @return
    42. */
    43. @Bean
    44. @Qualifier(AUTH_QUEUE_NAME)
    45. public Queue authQueue() {
    46. return QueueBuilder
    47. .durable (AUTH_QUEUE_NAME)
    48. .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
    49. .withArgument("x-dead-letter-routing-key", "dlx_auth")
    50. .build ();
    51. }
    52. /**
    53. * 委托授权专用死信队列
    54. * @return
    55. */
    56. @Bean
    57. @Qualifier(DLX_AUTH_QUEUE_NAME)
    58. public Queue dlxAuthQueue() {
    59. // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    60. return QueueBuilder
    61. .durable (DLX_AUTH_QUEUE_NAME)
    62. .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
    63. .withArgument("x-dead-letter-routing-key", "dlx_key")
    64. .build ();
    65. }
    66. @Bean
    67. public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
    68. return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs ();
    69. }
    70. /**
    71. * 委托授权专用死信队列绑定关系
    72. * @param dlxAuthQueue
    73. * @param dlxExchange
    74. * @return
    75. */
    76. @Bean
    77. public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
    78. return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs ();
    79. }
    80. /**
    81. * 委托授权专用队列绑定关系
    82. * @param authQueue
    83. * @param authExchange
    84. * @return
    85. */
    86. @Bean
    87. public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){
    88. return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs ();
    89. }
    90. }
    91. 复制代码

    ②发送含过期时间的消息

    向授权交换机,发送路由为"auth"的消息(指定了业务所需的超时时间) =》发向MqConfig.AUTH_QUEUE_NAME 队列

    1. rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "类型:END,信息:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> {
    2. /**
    3. * MessagePostProcessor:消息后置处理
    4. * 为消息设置属性,然后返回消息,相当于包装消息的类
    5. */
    6. //业务逻辑:过期时间=xxxx
    7. String ttl = "5000";
    8. //设置消息的过期时间
    9. message.getMessageProperties ().setExpiration (ttl);
    10. return message;
    11. });
    12. 复制代码

    ③超时后队列MqConfig.AUTH_QUEUE_NAME会将消息转发至其配置的死信路由"dlx_auth",监听该死信队列即可消费定时的消息

    1. /**
    2. * 授权定时处理
    3. * @param channel
    4. * @param message
    5. */
    6. @RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME)
    7. public void dlxAuthQ(Channel channel, Message message) throws IOException {
    8. System.out.println ("\n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason"));
    9. //1.判断消息类型:1.BEGIN 2.END
    10. try {
    11. //2.1 类型为授权到期(END)
    12. //2.1.1 修改报件办理人
    13. //2.1.2 修改授权状态为0(失效)
    14. //2.2 类型为授权开启(BEGIN)
    15. //2.2.1 修改授权状态为1(开启)
    16. System.out.println (new String(message.getBody (), Charset.forName ("utf8")));
    17. channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
    18. System.out.println ("已处理,授权相关信息修改成功");
    19. } catch (Exception e) {
    20. //拒签消息
    21. channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false);
    22. System.out.println ("授权相关信息处理失败, 进入死信队列记录日志");
    23. }
    24. }
  • 相关阅读:
    阿里云布置net core 项目
    安卓期末大作业——日记APP
    React核心概念
    MyBatis基本用法 && 什么是自动化测试 && Spring事务和事务传播机制 && 性能测试概念和术语 && Loadrunner安装
    实例036:算素数
    GTK渲染摄像头图像数据
    30 | 工欲善其事必先利其器:后端性能测试工具原理与行业常用工具简介
    MaxKey单点登录认证系统v3.5.10GA发布
    【数据结构与算法】简单排序
    Vue3+移动端适配屏幕+默认横屏展示
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/128145764