• 定时任务系列(8)-Quartz启动核心原理之集群


    再次回到scheduler.start()方法上来:

        public void start() throws SchedulerException {
    
            if (shuttingDown|| closed) {
                throw new SchedulerException(
                        "The Scheduler cannot be restarted after shutdown() has been called.");
            }
    
            // QTZ-212 : calling new schedulerStarting() method on the listeners
            // right after entering start()
            // 通知调度器监控器启动中
            notifySchedulerListenersStarting();
            //初始化标识为null,进行初始化操作
            if (initialStart == null) {
                initialStart = new Date();
                this.resources.getJobStore().schedulerStarted();            
                startPlugins();
            } else {
                resources.getJobStore().schedulerResumed();
            }
    
            schedThread.togglePause(false);
    
            getLog().info(
                    "Scheduler " + resources.getUniqueIdentifier() + " started.");
            //提醒调度器的监听启动
            notifySchedulerListenersStarted();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    第一次启动来调用肯定是没有初始化的,就不用考虑失火的问题,如果是初始化过的,需要考虑失火恢复任务。
    这次我们先来讨论一下初始化的schedulerStarted()这个方法。
    首先需要明确的是,RAMJobStore是基于内存的,不会存在什么需要初始化失火策略的问题,所以默认是不实现的。
    所以直接看JobStoreSupport中schedulerStarted的实现:

        public void schedulerStarted() throws SchedulerException {
    
            if (isClustered()) {
                //集群
                clusterManagementThread = new ClusterManager();
                if(initializersLoader != null)
                    clusterManagementThread.setContextClassLoader(initializersLoader);
                clusterManagementThread.initialize();
            } else {
                try {
                    //恢复任何失败或失败的作业,并酌情清理数据存储。
                    recoverJobs();
                } catch (SchedulerException se) {
                    throw new SchedulerConfigException(
                            "Failure occured during job recovery.", se);
                }
            }
    
            misfireHandler = new MisfireHandler();
            if(initializersLoader != null)
                misfireHandler.setContextClassLoader(initializersLoader);
            misfireHandler.initialize();
            schedulerRunning = true;
            
            getLog().debug("JobStore background threads started (as scheduler was started).");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    如果是集群模式,根本不需要考虑失火的问题,多个实例可以不间断调度。如果不是集群模式,则恢复任何失败或失败的作业。
    集群模式下,这里创建了ClusterManagerClusterManagerJobStoreSupport的一个内部类,ClusterManager是继承于Thread的线程。

    	public void initialize() {
    	    //检测心跳,故障转移
    	    this.manage();
    	
    	    ThreadExecutor executor = getThreadExecutor();
    	    executor.execute(ClusterManager.this);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ClusterManager的初始化方法如上,调用了内部的manage()方法,通过manage方法来检测心跳和故障转移,后使用了配置的线程执行了集群线程。接下来看看这个manage方法:

     private boolean manage() {
         boolean res = false;
         try {
    
             res = doCheckin();
    
             numFails = 0;
             getLog().debug("ClusterManager: Check-in complete.");
         } catch (Exception e) {
             if(numFails % 4 == 0) {
                 getLog().error(
                     "ClusterManager: Error managing cluster: "
                             + e.getMessage(), e);
             }
             numFails++;
         }
         return res;
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    以下是集群中故障转移的代码:

        /**
         * quartz的故障转移
         * @return
         * @throws JobPersistenceException
         */
        protected boolean doCheckin() throws JobPersistenceException {
            boolean transOwner = false;
            boolean transStateOwner = false;
            boolean recovered = false;
    
            Connection conn = getNonManagedTXConnection();
            try {
                // Other than the first time, always checkin first to make sure there is 
                // work to be done before we acquire the lock (since that is expensive, 
                // and is almost never necessary).  This must be done in a separate
                // transaction to prevent a deadlock under recovery conditions.
                List<SchedulerStateRecord> failedRecords = null;
                if (!firstCheckIn) {
                    //1.查询超时的节点
                    failedRecords = clusterCheckIn(conn);
                    commitConnection(conn);
                }
                
                if (firstCheckIn || (failedRecords.size() > 0)) {
                    getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
                    transStateOwner = true;
        
                    // Now that we own the lock, make sure we still have work to do. 
                    // The first time through, we also need to make sure we update/create our state record
                    failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
        
                    if (failedRecords.size() > 0) {
                        getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                        //getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
                        transOwner = true;
                        //2.处理超时的节点
                        clusterRecover(conn, failedRecords);
                        recovered = true;
                    }
                }
                
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                throw e;
            } finally {
                try {
                    releaseLock(LOCK_TRIGGER_ACCESS, transOwner);
                } finally {
                    try {
                        releaseLock(LOCK_STATE_ACCESS, transStateOwner);
                    } finally {
                        cleanupConnection(conn);
                    }
                }
            }
    
            firstCheckIn = false;
    
            return recovered;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    如果是第一次启动,获取群集中可能失败的所有调度程序实例的列表,并插入一条数据到SCHEDULER_STATE中:

        protected List<SchedulerStateRecord> clusterCheckIn(Connection conn)
            throws JobPersistenceException {
    
            List<SchedulerStateRecord> failedInstances = findFailedInstances(conn);
            
            try {
                // FUTURE_TODO: handle self-failed-out
    
                // check in...
                lastCheckin = System.currentTimeMillis();
                if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
                    //INSERT INTO {0}SCHEDULER_STATE (SCHED_NAME, INSTANCE_NAME, LAST_CHECKIN_TIME, CHECKIN_INTERVAL) VALUES({1}, ?, ?, ?)
                    getDelegate().insertSchedulerState(conn, getInstanceId(),
                            lastCheckin, getClusterCheckinInterval());
                }
                
            } catch (Exception e) {
                throw new JobPersistenceException("Failure updating scheduler state when checking-in: "
                        + e.getMessage(), e);
            }
    
            return failedInstances;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    quartz通过定时更新qrtz_scheduler_state表最后更新时间(LAST_CHECKIN_TIME字段)说明自己是存活状态。LAST_CHECKIN_TIME字段的信息会每隔{CHECKIN_INTERVAL字段}更新一次。

    故障转移中最重要的两步代码:

    • 找到可能故障的实例任务
    • 帮助故障实例运行任务
      接下来重点看findFailedInstances方法和clusterRecover
        /**
         * Get a list of all scheduler instances in the cluster that may have failed.
         * This includes this scheduler if it is checking in for the first time.
         * 获取群集中可能失败的所有调度程序实例的列表。如果是第一次签入,则包括此调度程序
         */
        protected List<SchedulerStateRecord> findFailedInstances(Connection conn)
            throws JobPersistenceException {
            try {
                List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>();
                boolean foundThisScheduler = false;
                long timeNow = System.currentTimeMillis();
                //先查询出所有的调度计划
                List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
    
                for(SchedulerStateRecord rec: states) {
                    //自己第一次启动,需要加载当前实例
                    //同时需要加载其他故障实例,如何判断故障:触发时间+阈值>当前时间
                    // find own record...
                    if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
                        foundThisScheduler = true;
                        if (firstCheckIn) {
                            failedInstances.add(rec);
                        }
                    } else {
                        // find failed instances...
                        if (calcFailedIfAfter(rec) < timeNow) {
                            failedInstances.add(rec);
                        }
                    }
                }
                
                // The first time through, also check for orphaned fired triggers.
                if (firstCheckIn) {
                    failedInstances.addAll(findOrphanedFailedInstances(conn, states));
                }
                
                // If not the first time but we didn't find our own instance, then
                // Someone must have done recovery for us.
                if ((!foundThisScheduler) && (!firstCheckIn)) {
                    // FUTURE_TODO: revisit when handle self-failed-out impl'ed (see FUTURE_TODO in clusterCheckIn() below)
                    getLog().warn(
                        "This scheduler instance (" + getInstanceId() + ") is still " + 
                        "active but was recovered by another instance in the cluster.  " +
                        "This may cause inconsistent behavior.");
                }
                
                return failedInstances;
            } catch (Exception e) {
                lastCheckin = System.currentTimeMillis();
                throw new JobPersistenceException("Failure identifying failed instances when checking-in: "
                        + e.getMessage(), e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    先查询出所有的调度示例,如果是第一次加载,就加载当前实例,如果是之后,就需要判断实例是不是故障,故障判断:触发时间+阈值>当前时间

        protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
            throws JobPersistenceException {
    
            if (failedInstances.size() > 0) {
    
                long recoverIds = System.currentTimeMillis();
    
                logWarnIfNonZero(failedInstances.size(),
                        "ClusterManager: detected " + failedInstances.size()
                                + " failed or restarted instances.");
                try {
                    for (SchedulerStateRecord rec : failedInstances) {
                        getLog().info(
                                "ClusterManager: Scanning for instance \""
                                        + rec.getSchedulerInstanceId()
                                        + "\"'s failed in-progress jobs.");
                        //查询此节点还没有处理完的任务
                        //SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?
                        List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
                                .selectInstancesFiredTriggerRecords(conn,
                                        rec.getSchedulerInstanceId());
    
                        int acquiredCount = 0;
                        int recoveredCount = 0;
                        int otherCount = 0;
    
                        Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
    
                        for (FiredTriggerRecord ftRec : firedTriggerRecs) {
                            //这里就是故障实例的任务,这里需要使用当前的实例执行故障实例的任务,需要将一些状态做一些改变
                            TriggerKey tKey = ftRec.getTriggerKey();
                            JobKey jKey = ftRec.getJobKey();
    
                            triggerKeys.add(tKey);
    
                            // release blocked triggers..
                            if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_WAITING, STATE_BLOCKED);
                            } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_PAUSED, STATE_PAUSED_BLOCKED);
                            }
    
                            // release acquired triggers..
                            if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
                                getDelegate().updateTriggerStateFromOtherState(
                                        conn, tKey, STATE_WAITING,
                                        STATE_ACQUIRED);
                                acquiredCount++;
                            } else if (ftRec.isJobRequestsRecovery()) {
                                // handle jobs marked for recovery that were not fully
                                // executed..
                                //SELECT JOB_NAME FROM {0}JOB_DETAILS WHERE SCHED_NAME={1} AND JOB_NAME = ? AND JOB_GROUP = ?
                                if (jobExists(conn, jKey)) {
                                    @SuppressWarnings("deprecation")
                                    SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
                                            "recover_"
                                                    + rec.getSchedulerInstanceId()
                                                    + "_"
                                                    + String.valueOf(recoverIds++),
                                            Scheduler.DEFAULT_RECOVERY_GROUP,
                                            new Date(ftRec.getScheduleTimestamp()));
                                    rcvryTrig.setJobName(jKey.getName());
                                    rcvryTrig.setJobGroup(jKey.getGroup());
                                    rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
                                    rcvryTrig.setPriority(ftRec.getPriority());
                                    JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
                                    jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
                                    rcvryTrig.setJobDataMap(jd);
    
                                    rcvryTrig.computeFirstFireTime(null);
                                    storeTrigger(conn, rcvryTrig, null, false,
                                            STATE_WAITING, false, true);
                                    recoveredCount++;
                                } else {
                                    getLog()
                                            .warn(
                                                    "ClusterManager: failed job '"
                                                            + jKey
                                                            + "' no longer exists, cannot schedule recovery.");
                                    otherCount++;
                                }
                            } else {
                                otherCount++;
                            }
    
                            // free up stateful job's triggers释放有状态作业的触发器
                            if (ftRec.isJobDisallowsConcurrentExecution()) {
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_WAITING, STATE_BLOCKED);
                                getDelegate()
                                        .updateTriggerStatesForJobFromOtherState(
                                                conn, jKey,
                                                STATE_PAUSED, STATE_PAUSED_BLOCKED);
                            }
                        }
    
                        getDelegate().deleteFiredTriggers(conn,
                                rec.getSchedulerInstanceId());
    
                        // Check if any of the fired triggers we just deleted were the last fired trigger
                        // records of a COMPLETE trigger.
                        int completeCount = 0;
                        for (TriggerKey triggerKey : triggerKeys) {
    
                            if (getDelegate().selectTriggerState(conn, triggerKey).
                                    equals(STATE_COMPLETE)) {
                                List<FiredTriggerRecord> firedTriggers =
                                        getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
                                if (firedTriggers.isEmpty()) {
    
                                    if (removeTrigger(conn, triggerKey)) {
                                        completeCount++;
                                    }
                                }
                            }
                        }
    
                        logWarnIfNonZero(acquiredCount,
                                "ClusterManager: ......Freed " + acquiredCount
                                        + " acquired trigger(s).");
                        logWarnIfNonZero(completeCount,
                                "ClusterManager: ......Deleted " + completeCount
                                        + " complete triggers(s).");
                        logWarnIfNonZero(recoveredCount,
                                "ClusterManager: ......Scheduled " + recoveredCount
                                        + " recoverable job(s) for recovery.");
                        logWarnIfNonZero(otherCount,
                                "ClusterManager: ......Cleaned-up " + otherCount
                                        + " other failed job(s).");
    
                        if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
                            getDelegate().deleteSchedulerState(conn,
                                    rec.getSchedulerInstanceId());
                        }
                    }
                } catch (Throwable e) {
                    throw new JobPersistenceException("Failure recovering jobs: "
                            + e.getMessage(), e);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152

    查询出当前节点或着故障节点实例所遗留的任务,重新创建SimpleTriggerImpl重新调度任务。

    重新回到ClusterManager,刚才分析完了这个线程的初始化方法,在这个初始化的方法中已经有了运行当前线程,现在看看run方法:

          while (!shutdown) {
    
               if (!shutdown) {
                   long timeToSleep = getClusterCheckinInterval();
                   long transpiredTime = (System.currentTimeMillis() - lastCheckin);
                   timeToSleep = timeToSleep - transpiredTime;
                   if (timeToSleep <= 0) {
                       timeToSleep = 100L;
                   }
    
                   if(numFails > 0) {
                       timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                   }
                   
                   try {
                       Thread.sleep(timeToSleep);
                   } catch (Exception ignore) {
                   }
               }
    
               if (!shutdown && this.manage()) {
                   //立即触发调度
                   signalSchedulingChangeImmediately(0L);
               }
    
           }//while !shutdown
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这段代码就是间隔时间(100ms)排查一次集群故障,并重新调度任务。

  • 相关阅读:
    gxhxjxizj
    工程管理系统源码之全面+高效的工程项目管理软件
    Pikachu靶场——SSRF 服务端请求伪造
    【待更新】【Rockchip】瑞芯微/rockchip 开发环境搭建|编译|烧录 开发实例
    学习总结week6_3excel函数
    你的开发套件已到货「GitHub 热点速览」
    充电器快充取电芯片XSP06Q+锂电池5A电流快速充电
    vue computed和watch的区别 v-if和v-show的区别 v-if和v-for优先级的问题
    Context
    【原创毕设程序】基于SSM的心理健康预约测试系统(SSM毕业设计程序)
  • 原文地址:https://blog.csdn.net/weixin_46399870/article/details/127606427