• 【SpringBoot框架篇】31.基于分布式锁或xxx-job实现分布式任务调度


    1.简介

    定时任务使用场景一般为指定时间做数据统计,临时数据清理等等。

    单节点部署的服务一般是通过下面方式实现即可:

    • 在SpringBoot启动类上面添加@EnableScheduling注解开启spring定时任务功能
    • 在定时任务方法上添加@Scheduled实现
    @SpringBootApplication
    @EnableScheduling
    public class PlatformApplication{} 
    
    @Component
    @Slf4j
    public class ScheduledServer {
        @Scheduled(cron = "0 0 1 * * ?")
        public void insertStatData() {
            log.info("-------------凌晨1点统计前一天的业务数据量--------------
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    以上的配置如果是在服务需要部署多个节点的时候会出现重复执行定时任务导出数据重复的问题,这个时候可以通过分布式锁或使用xxx-job分布式任务调度平台避免这个任务重复执行的问题。

    2.分布式锁实现

    常用的三种实现如下

    • 基于redis的单线程原子性
    • 基于数据库的排它锁
    • 基于ZooKeeper 文件节点实现

    详细信息参考我写的这篇博客: 点我跳转

    本文使用aop+redis优雅的使用分布式锁避免定时任务重复执行。

    2.1.引用依赖

            
                org.springframework.boot
                spring-boot-starter-data-redis
            
    
            
                org.aspectj
                aspectjweaver
                1.9.4
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.2.定义分布式锁注解

    @Target({ElementType.PARAMETER, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RedisLock {
    
        /**
         * 锁的名称(唯一标识),可选为""时候使用方法的名称
         */
        String name() default "";
    
        /**
         * 重试重获取锁的次数,默认0 不重试
         */
        int retry() default 0;
    
        /**
         * 占有锁的时间,避免程序宕机导致锁无法释放
         */
        int expired() default 60;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.3.配置切入点和获取锁释放锁逻辑

    • 下面定义了切入点为RedisLock注解类
    • 在增强处理的环绕通知逻辑里面去执行获取锁和释放锁的逻辑
    @Aspect
    @Slf4j
    @Component
    public class RedisLockPointcut {
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @Pointcut("@annotation(com.ljm.boot.distributedjob.annotation.RedisLock)")
        public void redisLockPointCut() {
        }
    
        @Around("redisLockPointCut()")
        public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
            Method method = currentMethod(proceedingJoinPoint);
            //获取到方法的注解对象
            RedisLock redisLock = method.getAnnotation(RedisLock.class);
            //获取锁的名称
            String methodName = redisLock.name();
            if (!StringUtils.hasLength(methodName)) {
                //如果注解里没有设置锁的名称,默认使用方法的名称
                methodName = method.getName();
            }
            //获取到锁的标识
            boolean flag = true;
            int retryCount = redisLock.retry();
            do {
                if (!flag && retryCount > 0) {
                    Thread.sleep(1000L);
                    retryCount--;
                }
                flag = stringRedisTemplate.opsForValue().setIfAbsent(methodName, "1", redisLock.expired(), TimeUnit.SECONDS);
                if (flag) {
                    //获取到锁结束循环
                    break;
                }
                //根据配置的重试次数,执行n次获取锁的方法,默认不重试
            } while (retryCount > 0);
    
            //result为连接点的返回结果
            Object result = null;
            if (flag) {
                try {
                    result = proceedingJoinPoint.proceed();
                } catch (Throwable e) {
                    /*异常通知方法*/
                    log.error("异常通知方法>目标方法名{},异常为:{}", method.getName(), e);
                } finally {
                    stringRedisTemplate.delete(methodName);
                }
                return result;
            }
            log.error("执行:{} 未获取锁,重试次数:{}", method.getName(), redisLock.retry());
            return null;
        }
    
        /**
         * 根据切入点获取执行的方法
         */
        private Method currentMethod(JoinPoint joinPoint) {
            String methodName = joinPoint.getSignature().getName();
            //获取目标类的所有方法,找到当前要执行的方法
            Method[] methods = joinPoint.getTarget().getClass().getMethods();
            Method resultMethod = null;
            for (Method method : methods) {
                if (method.getName().equals(methodName)) {
                    resultMethod = method;
                    break;
                }
            }
            return resultMethod;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    2.4.测试任务

    • 只需要在需要使用分布式锁的方法上面加@RedisLock注解即可优雅实现对应功能
    • 在springboot启动类添加@EnableScheduling注解
    @Slf4j
    @Component
    public class RedisLockScheduled {
    
        /**
         * 每分钟执行一次任务,设置分布式锁的名称insertStatData,过期时间为30秒,重试次数为3次
         */
        @Scheduled(cron = "0 */1 * * * ?")
        @RedisLock(name="insertStatData",expired = 30,retry = 3)
        //@RedisLock   也可以不设置属性直接使用,默认分布锁的名称以函数名insertStatData命名,expired和retry用注解定义时候的默认值
        public void insertStatData() {
            try {
                //模拟业务处理线程休眠10秒
                Thread.sleep(10000L);
            }catch (Exception e){
                e.printStackTrace();
            }
            log.info("-------------每分钟打印一次日志--------------");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    使用8031端口和8032端口启动服务两次,等运行一段时间可以看到同一时间端只有一个服务执行了定时任务内的逻辑。

    8031端口服务日志打印
    在这里插入图片描述

    8032端口服务日志打印
    在这里插入图片描述

    由上面图片中日志可以看到基于分布式锁可以控制只有一个节点可以执行任务。

    3.使用分布式任务调度平台xxx-job

    xxx-job的github地址
    本文使用的是2.3.1分支的代码。

    架构图:
    在这里插入图片描述

    3.1.下载源码并运行项目

    在这里插入图片描述

    • 1.xxl-job默认使用的mysql数据库,需要先手动创建名称为xxl_job的数据库

    • 2.导入项目 doc/db/tables_xxl_job.sql数据
      在这里插入图片描述

    • 3.修改配置文件数据库连接信息
      在这里插入图片描述

    • 4.需要配置token用于执行器注册时候鉴权认证使用,默认为default_token

    • 5.启动xxl-job-admin项目后用 http://localhost:8080/xxl-job-admin/ 访问后台
      账号: admin
      密码: 123456
      在这里插入图片描述

    • 6.需要创建执行器
      在这里插入图片描述
      添写完执行器的名称后点击保存按钮
      在这里插入图片描述

    • 7.为执行器创建定时任务
      在这里插入图片描述

    上图中的JobHandler填写为xxlJobTask在执行端的代码里需要用到

    • 8.启动定时任务
      在这里插入图片描述

    3.2.springBoot项目集成xxl-job

    • 1.在pom文件中引入依赖
            
                com.xuxueli
                xxl-job-core
                2.3.1
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 2.配置xxl-job服务端信息
      在application.yml中添加下面配置
    xxlJob:
      #xxl-job服务端配置文件中定义好的token
      accessToken: default_token
      #xxl-job服务端地址(用于注册执行器使用)
      adminAddresses: http://127.0.0.1:8080/xxl-job-admin
      executor:
        # 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
        appname: testJob
        # 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP
        ip:
        # 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999
        port: 0
        # 执行器运行日志文件存储磁盘路径 [选填]
        logpath: logs/xxlJob
        # 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; -1关闭自动清理功能;
        logretentiondays: 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 3.注册xxl-job执行器Bean实例
    @Slf4j
    @Configuration
    public class XxlJobConfig {
    
        @Value("${xxlJob.accessToken}")
        private String accessToken;
    
        @Value("${xxlJob.adminAddresses}")
        private String adminAddresses;
    
        @Value("${xxlJob.executor.appname}")
        private String appName;
    
        @Value("${xxlJob.executor.ip}")
        private String ip;
    
        @Value("${xxlJob.executor.port}")
        private int port;
    
        @Value("${xxlJob.executor.logpath}")
        private String logPath;
    
        @Value("${xxlJob.executor.logretentiondays}")
        private int logRetentionDays;
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            log.info("*****************xxlJobExecutor bean init*****************");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appName);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
            return xxlJobSpringExecutor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 4.启动类添加@EnableScheduling注解
      在这里插入图片描述

    • 5.测试任务的代码

    @Component
    public class XxlJobScheduled {
        @XxlJob("xxlJobTask")
        public ReturnT<String> xxlJobTest(String date) {
            XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
            String jobParam = xxlJobContext.getJobParam();
            try {
                //模拟业务处理线程休眠10秒
                Thread.sleep(10000L);
            }catch (Exception e){
                e.printStackTrace();
            }
            log.info("xxlJobTest定时任务执行成功,jobParam:{}",jobParam);
            return ReturnT.SUCCESS;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 6.运行项目
      使用8031端口和8032端口启动服务两次,看到下图的信息表示执行器注册成功。

    在这里插入图片描述

    运行一段时间观察日志打印情况

    8031端口服务日志如下
    在这里插入图片描述

    8032端口服务如下
    在这里插入图片描述

    由上面图片中日志可以看到只有一个节点可以执行任务。

    4.项目配套代码

    github地址

    创作不易,要是觉得我写的对你有点帮助的话,麻烦在github上帮我点下 Star

    【SpringBoot框架篇】其它文章如下,后续会继续更新。

  • 相关阅读:
    5种限流算法,7种限流方式,挡住突发流量?
    Canal+Kafka实现MySQL与Redis数据同步(一)
    如何给注册中心锦上添花?
    Vue3快速入门
    机械转码日记【23】模板进阶
    Java中List转字符串的方法
    CS8630 无效的 nullable 值: C# 7.3 的“Enable”
    Kubernetes介绍和资源管理
    Node.js--》简易资金管理系统后台项目实战(后端)
    面试—如何介绍项目中的多级缓存?
  • 原文地址:https://blog.csdn.net/ming19951224/article/details/126888345