• elastic-job源码(1)- job自动装配


    版本:3.1.0-SNAPSHOT
     
    Maven 坐标
    1
    2
    3
    4
    5
        org.apache.shardingsphere.elasticjob
        elasticjob-lite-spring-boot-starter
        ${latest.version}
     
    Spring.factories配置
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

    在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。
     
    ElasticJobLiteAutoConfiguration.java
    复制代码
    /**
     * ElasticJob-Lite auto configuration.
     */
    @Configuration(proxyBeanMethods = false)
    @AutoConfigureAfter(DataSourceAutoConfiguration.class)
    
    
    /**
     * elastic job 开关
     * elasticjob.enabled.ture默认为true
     */
    @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)
    
    
    /**
     * 导入
     * ElasticJobRegistryCenterConfiguration.class 注册中心配置
     * ElasticJobTracingConfiguration.class job事件追踪配置
     * ElasticJobSnapshotServiceConfiguration.class 快照配置
     */
    @Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})
    
    
    /**
     * job相关配置信息
     */
    @EnableConfigurationProperties(ElasticJobProperties.class)
    public class ElasticJobLiteAutoConfiguration {
        
        @Configuration(proxyBeanMethods = false)
        /**
         * ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器
         * ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行
         */
        @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
        protected static class ElasticJobConfiguration {
        }
    }
    复制代码

    Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。
    自动装配主要做了4件事事
    1.配置zookeeper 客户端信息,启动连接zookeeper.
    2.配置事件追踪数据库,用于保存job运行记录
    3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中
    4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
     
    第一件事:配置zookeeper 客户端信息,启动连接zookeeper.
    ZookeeperRegistryCenter.class
    复制代码
    public void init() {
        log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                //设置zookeeper 服务器地址
                .connectString(zkConfig.getServerLists())
                //设置重试机制
                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
                //设置命名空间,zookeeper节点名称
                .namespace(zkConfig.getNamespace());
        //设置session超时时间
        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
        }
        //设置连接超时时间
        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
        }
        if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
            builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                    .aclProvider(new ACLProvider() {
                    
                        @Override
                        public List getDefaultAcl() {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    
                        @Override
                        public List getAclForPath(final String path) {
                            return ZooDefs.Ids.CREATOR_ALL_ACL;
                        }
                    });
        }
        client = builder.build();
        //zookeeper 客户端开始启动
        client.start();
        try {
            //zookeeper 客户端一直连接
            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                client.close();
                throw new KeeperException.OperationTimeoutException();
            }
            //CHECKSTYLE:OFF
        } catch (final Exception ex) {
            //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }
    复制代码

     

    第二件事: 配置事件追踪数据库,用于保存job运行记录

    ElasticJobTracingConfiguration.java

     

    复制代码
    /**
     * Create a bean of tracing DataSource.
     *
     * @param tracingProperties tracing Properties
     * @return tracing DataSource
     */
    @Bean("tracingDataSource")
    //spring中注入bean name 为tracingDataSource的job数据库连接信息
    public DataSource tracingDataSource(final TracingProperties tracingProperties) {
        //获取elastic-job 数据库配置
        DataSourceProperties dataSource = tracingProperties.getDataSource();
        if (dataSource == null) {
            return null;
        }
        HikariDataSource tracingDataSource = new HikariDataSource();
        tracingDataSource.setJdbcUrl(dataSource.getUrl());
        BeanUtils.copyProperties(dataSource, tracingDataSource);
        return tracingDataSource;
    }
    
    
    /**
     * Create a bean of tracing configuration.
     *
     * @param dataSource required by constructor
     * @param tracingDataSource tracing ataSource
     * @return a bean of tracing configuration
     */
    @Bean
    @ConditionalOnBean(DataSource.class)
    @ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
    public TracingConfiguration tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
        /**
         * dataSource 是业务数据库
         * tracingDataSource 是job数据库
         * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
         * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
         */
        DataSource ds = tracingDataSource;
        if (ds == null) {
            ds = dataSource;
        }
        return new TracingConfiguration<>("RDB", ds);
    }
    复制代码

     

    通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。

     

    第三件事:解析所有job配置文件

    ElasticJobBootstrapConfiguration.class

     

    复制代码
    public void createJobBootstrapBeans() {
        //获取job配置
        ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
        //获取单利注册对象
        SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
        //获取注入zookeeper 客户端
        CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
        //获取job事件追踪
        TracingConfiguration tracingConfig = getTracingConfiguration();
        //构造JobBootstraps
        constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
    }
    复制代码

    重要的是constructJobBootstraps 这个方法,来看下

    复制代码
    private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                        final CoordinatorRegistryCenter registryCenter, final TracingConfiguration tracingConfig) {
        //遍历配置的每一个job
        for (Map.Entry entry : elasticJobProperties.getJobs().entrySet()) {
            ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
            //校验 job class 和 type 都为空抛异常
            Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                            || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                    "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
            //校验 job class 和 type 都有 报相互排斥
            Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                            || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                    "[elasticJobClass] and [elasticJobType] are mutually exclusive.");
    
    
            if (null != jobConfigurationProperties.getElasticJobClass()) {
                //通过class 注入job
                registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
            } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
                //通过type 注入job
                registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
            }
        }
    }
    复制代码

    Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入
     
    再来看看registerClassedJob 方法里的内容
    复制代码
    private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                    final TracingConfiguration tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
        //获取job配置
        JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
        //配置job事件追踪
        jobExtraConfigurations(jobConfig, tracingConfig);
        //获取job类型
        ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
        //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务
        if (Strings.isNullOrEmpty(jobConfig.getCron())) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
            singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
        } else {
            //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务
            //设置bean name
            String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
            //注入ScheduleJobBootstrap对象为单利对象
            singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
        }
    }
    复制代码

    Class 类型注入的job有两种类型
    1.ScheduleJobBootstrap:定时任务类型的job。
    2.OneOffJobBootstrap:一定次job类型。
     
    看下定义的new ScheduleJobBootstrap 方法
    复制代码
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
        Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
        this.regCenter = regCenter;
        //获取job监听器
        Collection jobListeners = getElasticJobListeners(jobConfig);
        // 集成所有操作zookeeper 节点的services,job 监听器
        setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
        //获取当前job名称
        String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
        //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息
        this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
        // 集成所有操作zookeeper 节点的services
        schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
        jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
        //检验job配置
        validateJobProperties();
        //定义job执行器
        jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
        //监听器里注入GuaranteeService
        setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
        //创建定时任务,开始执行
        jobScheduleController = createJobScheduleController();
    }
    复制代码

     

    看下createJobScheduleController

    复制代码
    private JobScheduleController createJobScheduleController() {
        JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
        //注册job
        JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
        //注册器开始运行
        registerStartUpInfo();
        return result;
    }
    复制代码

    看下registerStartUpInfo方法

    复制代码
    public void registerStartUpInfo(final boolean enabled) {
        //开始所有的监听器
        listenerManager.startAllListeners();
        //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
        leaderService.electLeader();
        //{namespace}/{ipservers} 设置enable处理
        serverService.persistOnline(enabled);
        //临时节点   /{namespave}/instances 放置运行服务实例信息
        instanceService.persistOnline();
        //开启一个异步服务
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
    复制代码

    这里实行的操作:
    1.开启所有监听器处理
    2.leader选举
    3.持久化节点数据
    4.开启异步服务
     
    第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
     
    复制代码
    @Override
    public void run(final String... args) {
        log.info("Starting ElasticJob Bootstrap.");
        applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
        log.info("ElasticJob Bootstrap started.");
    }
    复制代码

    获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。
     
     
     
     
     
  • 相关阅读:
    uniapp 微信小程序之隐私协议开发
    SAP ARFCSTATE ARFCSDATA TRFCQOUT
    python之requests库常用方式
    MVC、MVP、MVVM区别
    Maven-DskipTests和-Dmaven.test.skip=true的区别
    自动驾驶--定位技术
    VTK 三维场景
    python:list和dict的基本操作实例
    23、分布式锁
    数据结构-排序
  • 原文地址:https://www.cnblogs.com/lingyujuan/p/17357040.html