• 定时任务系列(9)-Quartz启动核心原理之错过调度


    再回到上一章的的开头,我们分析了集群的地方也还有一段代码:

           misfireHandler = new MisfireHandler();
            if(initializersLoader != null)
                misfireHandler.setContextClassLoader(initializersLoader);
            misfireHandler.initialize();
            schedulerRunning = true;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这一段就是初始化一个MisfireHandler线程,MisfireHandler也是JobStoreSupport的一个内部类,同集群的初始化方法一样,这里的初始化方法也执行了线程的执行。
    直接看run方法

         while (!shutdown) {
    
             long sTime = System.currentTimeMillis();
             //获取misfired的job
             RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
    
             if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                 signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
             }
    
             if (!shutdown) {
                 long timeToSleep = 50l;  // At least a short pause to help balance threads
                 if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                     timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                     if (timeToSleep <= 0) {
                         timeToSleep = 50l;
                     }
    
                     if(numFails > 0) {
                         timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                     }
                 }
    
                 try {
                     Thread.sleep(timeToSleep);
                 } catch (Exception ignore) {
                 }
             }//while !shutdown
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    首先获取misfired的job,根据不同策略设定的调度时间,重新调度任务
    例如:

    • MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:立即执行一次
    • MISFIRE_INSTRUCTION_DO_NOTHING:等待下一次触发
    • MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY:忽略异常

    然后再每个间隔(默认50ms)重新检测misfired

    现在来看看quartz如何获取misfired的job,直接看manage方法中的doRecoverMisfires方法

        protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
            boolean transOwner = false;
            Connection conn = getNonManagedTXConnection();
            try {
                RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
                
                // Before we make the potentially expensive call to acquire the 
                // trigger lock, peek ahead to see if it is likely we would find
                // misfired triggers requiring recovery.
                // 在我们进行潜在的昂贵调用以获取触发器锁之前,请提前查看是否可能会发现需要恢复的未触发触发器。
                // 这里如果没有失火的话,是可以不用拿锁的
                int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
                    getDelegate().countMisfiredTriggersInState(
                        conn, STATE_WAITING, getMisfireTime()) : 
                    Integer.MAX_VALUE;
                
                if (misfireCount == 0) {
                    getLog().debug(
                        "Found 0 triggers that missed their scheduled fire-time.");
                } else {
                    transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                    //拿到锁后处理失火故障
                    result = recoverMisfiredJobs(conn, false);
                }
                
                commitConnection(conn);
                return result;
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                throw e;
            } catch (SQLException e) {
                rollbackConnection(conn);
                throw new JobPersistenceException("Database error recovering from misfires.", e);
            } catch (RuntimeException e) {
                rollbackConnection(conn);
                throw new JobPersistenceException("Unexpected runtime exception: "
                        + e.getMessage(), e);
            } finally {
                try {
                    releaseLock(LOCK_TRIGGER_ACCESS, transOwner);
                } finally {
                    cleanupConnection(conn);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    这里在获取锁前再查询一次是否有失火的任务,获取锁的开销是很大的,这里的处理尽量避免获取锁。拿到锁后处理失火故障:

        protected RecoverMisfiredJobsResult recoverMisfiredJobs(
            Connection conn, boolean recovering)
            throws JobPersistenceException, SQLException {
    
            // If recovering, we want to handle all of the misfired
            // triggers right away.
            int maxMisfiresToHandleAtATime = 
                (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
            
            List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
            long earliestNewTime = Long.MAX_VALUE;
            // We must still look for the MISFIRED state in case triggers were left 
            // in this state when upgrading to this version that does not support it.
            //我们仍然必须寻找 MISFIRED 状态,以防在升级到不支持它的版本时触发器处于此状态。
            //SELECT TRIGGER_NAME, TRIGGER_GROUP FROM t_rb_qrtz_TRIGGERS WHERE SCHED_NAME = 'scheduler'
            // AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < '1662464980466' AND TRIGGER_STATE = 'WAITING' ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
            boolean hasMoreMisfiredTriggers =
                getDelegate().hasMisfiredTriggersInState(
                    conn, STATE_WAITING, getMisfireTime(), 
                    maxMisfiresToHandleAtATime, misfiredTriggers);
    
            if (hasMoreMisfiredTriggers) {
                getLog().info(
                    "Handling the first " + misfiredTriggers.size() +
                    " triggers that missed their scheduled fire-time.  " +
                    "More misfired triggers remain to be processed.");
            } else if (misfiredTriggers.size() > 0) { 
                getLog().info(
                    "Handling " + misfiredTriggers.size() + 
                    " trigger(s) that missed their scheduled fire-time.");
            } else {
                getLog().debug(
                    "Found 0 triggers that missed their scheduled fire-time.");
                return RecoverMisfiredJobsResult.NO_OP; 
            }
    
            for (TriggerKey triggerKey: misfiredTriggers) {
                
                OperableTrigger trig = 
                    retrieveTrigger(conn, triggerKey);
    
                if (trig == null) {
                    continue;
                }
                //真正更新失火的数据
                doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);
    
                if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
                    earliestNewTime = trig.getNextFireTime().getTime();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    前面再次查询一次处于WAITING状态的trigger,拿到所有失火的数据再找到trigger,将trigger封住成OperableTrigger,再针对每个Trigger更新下次调度时间。

        private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
            Calendar cal = null;
            if (trig.getCalendarName() != null) {
                cal = retrieveCalendar(conn, trig.getCalendarName());
            }
    
            schedSignaler.notifyTriggerListenersMisfired(trig);
            //失火处理策略的更新
            trig.updateAfterMisfire(cal);
    
            if (trig.getNextFireTime() == null) {
                storeTrigger(conn, trig,
                    null, true, STATE_COMPLETE, forceState, recovering);
                schedSignaler.notifySchedulerListenersFinalized(trig);
            } else {
                storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                        forceState, recovering);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    先通知所有的监听失火的监听器,然后修改具体的trigger,再重新将trigger写入数据库。

  • 相关阅读:
    华为云数据库战略启示录
    netty系列之:netty中的核心MessageToByte编码器
    ES6学习系列
    基于SSM+Vue的汽车售票网站的设计与实现
    58-Console接口电路设计
    淘宝API详情接口调用示例
    基于深度学习的菠萝与果叶视觉识别及切断机构设计
    机器学习笔记:自监督学习
    端口被占用怎么解决
    使用TortoiseGit导出两次提交时间之间的差异文件
  • 原文地址:https://blog.csdn.net/weixin_46399870/article/details/127857801