• xxl-job源码解读:路由策略ExecutorRoute


    本文基于xxl-job的2.3.1版本

    基于源码介绍定时任务的路由选取策略

    基本说明

    路由策略在创建任务时选择,页面上的默认值为【第一个】。

    当任务的执行器路由地址只有一个时,不需要进行路由判断,建议选取策略【第一个】。

    当执行器地址有多个时,会根据选择的路由策略,进行执行路由选择,选取一个地址进行调度。

    其中【分片广播】比较特殊,会对执行器下所有路由进行调用执行。

    代码功能解读

    xxl-job触发器代码主要在 com.xxl.job.admin.core.route 包下。

    路由策略的实现,除了【分片广播】策略的逻辑被写在了触发器中,其他的均定义在com.xxl.job.admin.core.route.strategy

    以下策略介绍不包含分片广播

    包含的路由策略如下图:

    在这里插入图片描述

    路由策略选取

    通过枚举类定义路由策略(除了分片广播)ExecutorRouteStrategyEnum,并将策略类 router与枚举类型进行绑定。

    通过枚举类型直接能获取到对应路由策略的实现类。

    这里可以看到,因为分片广播的逻辑代码放在了触发器 XxlJobTrigger 中,没有对应的实现类,所以为null。

    /**
     * 执行器路由选取策略
     *
     * @author xuxueli on 17/3/10.
     */
    public enum ExecutorRouteStrategyEnum {
    
        FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
        LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
        ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
        RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
        CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
        LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
        LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
        FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
        BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
        SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
    
        ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
            this.title = title;
            this.router = router;
        }
    
        private final String title;
    
        /**
         * 路由策略实现类
         */
        private final ExecutorRouter router;
    
        public String getTitle() {
            return title;
        }
    
        public ExecutorRouter getRouter() {
            return router;
        }
    
        public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem) {
            if (name != null) {
                for (ExecutorRouteStrategyEnum item : ExecutorRouteStrategyEnum.values()) {
                    if (item.name().equals(name)) {
                        return item;
                    }
                }
            }
            return defaultItem;
        }
    
    }
    

    路由策略接口

    这里使用了策略模式,策略的实现类通过实现 ExecutorRouter 抽象类,绑定枚举进行使用。

    如果使用中有什么特殊的选举需求,可以通过实现ExecutorRouter 进行拓展。

    /**
     * 路由策略抽象类
     *
     * @author xuxueli on 17/3/10.
     */
    public abstract class ExecutorRouter {
        protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
    
        /**
         * route address
         *
         * @param addressList 执行器的路由地址配置
         * @return ReturnT.content=address
         */
        public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
    
    }
    
    

    路由策略-第一个

    取路由地址数组中第一个

    /**
     * 路由策略-第一个
     *
     * @author xuxueli on 17/3/10.
     */
    public class ExecutorRouteFirst extends ExecutorRouter {
    
        @Override
        public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
            return new ReturnT<>(addressList.get(0));
        }
    
    }
    

    路由策略-最后一个

    取路由地址数组中第最后一个

    /**
     * 路由策略-最后一个
     *
     * @author xuxueli on 17/3/10.
     */
    public class ExecutorRouteLast extends ExecutorRouter {
    
        @Override
        public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
            return new ReturnT<>(addressList.get(addressList.size() - 1));
        }
    
    }
    

    路由策略-轮询

    通过一个 ConcurrentMap 缓存记录任务ID以及执行数字,以执行数字取模作为路由地址下标,选取地址返回。

    每次调用后执行数字 +1,达到路由地址轮询调用的作用。

    缓存每24小时清空一次

    /**
     * 路由策略-轮询
     *
     * @author xuxueli on 17/3/10.
     */
    public class ExecutorRouteRound extends ExecutorRouter {
    
        /**
         * 轮询记录 JobId -> 调用次数递增数字(初始值为100以内的随机数, 每次调用递增, 用于取模作为路由地址数组index, 决定当次调用路由)
         */
        private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
        private static long CACHE_VALID_TIME = 0;
    
        private static int count(int jobId) {
            // cache clear 每24小时清空一次 轮询记录缓存
            if (System.currentTimeMillis() > CACHE_VALID_TIME) {
                routeCountEachJob.clear();
                CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24;
            }
    
            AtomicInteger count = routeCountEachJob.get(jobId);
            if (count == null || count.get() > 1000000) {
                // 初始化时主动Random一次,缓解首次压力
                count = new AtomicInteger(new Random().nextInt(100));
            } else {
                // count++ 加一并返回
                count.addAndGet(1);
            }
            routeCountEachJob.put(jobId, count);
            return count.get();
        }
    
        @Override
        public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
            String address = addressList.get(count(triggerParam.getJobId()) % addressList.size());
            return new ReturnT<>(address);
        }
    
    }
    

    路由策略-随机

    通过 Random 进行随机数获取,决定路由下标返回地址

    /**
     * 路由策略-随机
     * 

    随机选择在线的机器

    * * @author xuxueli on 17/3/10. */
    public class ExecutorRouteRandom extends ExecutorRouter { private static final Random localRandom = new Random(); @Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = addressList.get(localRandom.nextInt(addressList.size())); return new ReturnT<>(address); } }

    路由策略-一致性Hash

    通过一致性Hash,让每个Job固定调用其中一台机器,具体哪台机器根据hash值的范围选取。

    同时通过循环,扩充hash的集合大小,以保证分组下机器分配job足够平均。

    /**
     * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
     * a、virtual node:解决不均衡问题
     * b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
     *
     * @author xuxueli on 17/3/10.
     */
    public class ExecutorRouteConsistentHash extends ExecutorRouter {
    
        private static int VIRTUAL_NODE_NUM = 100;
    
        /**
         * get hash code on 2^32 ring (md5散列的方式计算hash值)
         *
         * @param key
         * @return
         */
        private static long hash(String key) {
    
            // md5 byte
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new RuntimeException("MD5 not supported", e);
            }
            md5.reset();
            byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
    
            md5.update(keyBytes);
            byte[] digest = md5.digest();
    
            // hash code, Truncate to 32-bits
            long hashCode = ((long) (digest[3] & 0xFF) << 24)
                    | ((long) (digest[2] & 0xFF) << 16)
                    | ((long) (digest[1] & 0xFF) << 8)
                    | (digest[0] & 0xFF);
    
            return hashCode & 0xffffffffL;
        }
    
        public String hashJob(int jobId, List<String> addressList) {
    
            // ------A1------A2-------A3------
            // -----------J1------------------
            TreeMap<Long, String> addressRing = new TreeMap<>();
            // 每个复制循环100次,取hash值,让map足够大,以保证分组下机器分配JOB足够平均
            for (String address : addressList) {
                for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                    long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                    addressRing.put(addressHash, address);
                }
            }
    
            long jobHash = hash(String.valueOf(jobId));
            // 根据jobId的hash结果排序,获取出大于等于该结果的部分,存在则取这部分的第一条,不存在则选取第一个
            SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
            if (!lastRing.isEmpty()) {
                return lastRing.get(lastRing.firstKey());
            }
            return addressRing.firstEntry().getValue();
        }
    
        @Override
        public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
            String address = hashJob(triggerParam.getJobId(), addressList);
            return new ReturnT<>(address);
        }
    
    }
    

    路由策略-最不经常使用

    使用 ConcurrentMap> jobLfuMap 记录每个任务ID在路由地址下的调用次数,选取调用次数最小的路由地址。

    记录的缓存每24小时清空一次。

    依照实现逻辑,如果运行中修改配置,给执行器添加了个路由地址,短时间内调度会全部涌入新地址中。高频任务调度的情况下需要注意下。

    /**
     * 路由策略-LFU(Least Frequently Used):最不经常使用,频率/次数
     * 

    * 单个JOB对应的每个执行器,使用频率最低的优先被选举 * * @author xuxueli on 17/3/10. */ public class ExecutorRouteLFU extends ExecutorRouter { /** * JobId -> [路由地址 -> 调用次数] */ private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<>(); private static long CACHE_VALID_TIME = 0; public String route(int jobId, List<String> addressList) { // cache clear if (System.currentTimeMillis() > CACHE_VALID_TIME) { jobLfuMap.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24; } // lfu item init HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList; if (lfuItemMap == null) { lfuItemMap = new HashMap<>(); jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖 } // put new for (String address : addressList) { // 新地址,或者已经超过一百万,使用随机数进行初始化。设置上限避免无上限的递增 if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) > 1000000) { lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力 } } // remove old List<String> delKeys = new ArrayList<>(); for (String existKey : lfuItemMap.keySet()) { if (!addressList.contains(existKey)) { delKeys.add(existKey); } } if (delKeys.size() > 0) { for (String delKey : delKeys) { lfuItemMap.remove(delKey); } } // load least userd count address List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<>(lfuItemMap.entrySet()); // 根据value进行排序,即根据调用次数进行正序排序,第一条则为在24小时内,调用次数最少的(不考虑随机数的差值影响) // 排序代码可以简化为 lfuItemList.sort(Entry.comparingByValue()); -- java8 Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() { @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return o1.getValue().compareTo(o2.getValue()); } }); Map.Entry<String, Integer> addressItem = lfuItemList.get(0); String minAddress = addressItem.getKey(); addressItem.setValue(addressItem.getValue() + 1); return addressItem.getKey(); } @Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT<>(address); } }

    路由策略-最近最久未使用

    利用LinkedHashMapaccessOrder = true 时,访问顺序排序的特性,进行路由选取

    记录的缓存每24小时清空一次。

    对于那种一天及以上才执行一次的任务,在地址配置不变的情况下,会一直调度第一个路由地址。

    /**
     * 路由策略-LRU(Least Recently Used):最近最久未使用,时间
     * 

    * 单个JOB对应的每个执行器,最久未使用的优先被选举 * * @author xuxueli on 17/3/10. */ public class ExecutorRouteLRU extends ExecutorRouter { /** * JobId -> [路由地址 -> 路由地址] * 用LinkedHashMap是为了使用accessOrder参数特性, 其中的value没有用 */ private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<>(); private static long CACHE_VALID_TIME = 0; public String route(int jobId, List<String> addressList) { // cache clear 每天清空一次缓存 if (System.currentTimeMillis() > CACHE_VALID_TIME) { jobLRUMap.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000 * 60 * 60 * 24; } // init lru LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId); if (lruItem == null) { /** * LinkedHashMap * a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期; * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法; */ lruItem = new LinkedHashMap<>(16, 0.75f, true); jobLRUMap.putIfAbsent(jobId, lruItem); } // 更新map里面的地址,先插入新增地址,再移除已经不在配置中的地址 // put new for (String address : addressList) { if (!lruItem.containsKey(address)) { lruItem.put(address, address); } } // remove old List<String> delKeys = new ArrayList<>(); for (String existKey : lruItem.keySet()) { if (!addressList.contains(existKey)) { delKeys.add(existKey); } } if (delKeys.size() > 0) { for (String delKey : delKeys) { lruItem.remove(delKey); } } // load 获取排序的第一条数据(即最久未使用) String eldestKey = lruItem.entrySet().iterator().next().getKey(); // 由于 accessOrder 设置为了true,会根据访问方法get重新排序: get之后元素被排序到最后,达到根据调度时间正序排序的效果 String eldestValue = lruItem.get(eldestKey); return eldestValue; } @Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT<>(address); } }

    路由策略-故障转移

    按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度

    /**
     * 路由策略-故障转移
     * 

    按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度

    * * @author xuxueli on 17/3/10. */
    public class ExecutorRouteFailover extends ExecutorRouter { @Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { StringBuffer beatResultSB = new StringBuffer(); for (String address : addressList) { // beat ReturnT<String> beatResult; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); beatResult = executorBiz.beat(); } catch (Exception e) { logger.error(e.getMessage(), e); beatResult = new ReturnT<>(ReturnT.FAIL_CODE, "" + e); } beatResultSB.append((beatResultSB.length() > 0) ? "

    "
    : "") .append(I18nUtil.getString("jobconf_beat") + ":") .append("
    address:"
    ).append(address) .append("
    code:"
    ).append(beatResult.getCode()) .append("
    msg:"
    ).append(beatResult.getMsg()); // beat success if (beatResult.getCode() == ReturnT.SUCCESS_CODE) { beatResult.setMsg(beatResultSB.toString()); beatResult.setContent(address); return beatResult; } } return new ReturnT<>(ReturnT.FAIL_CODE, beatResultSB.toString()); } }

    路由策略-忙碌转移

    按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度

    /**
     * 路由策略-忙碌转移
     *
     * @author xuxueli on 17/3/10.
     */
    public class ExecutorRouteBusyover extends ExecutorRouter {
    
        @Override
        public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
            StringBuffer idleBeatResultSB = new StringBuffer();
            for (String address : addressList) {
                // beat
                ReturnT<String> idleBeatResult;
                try {
                    ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                    idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    idleBeatResult = new ReturnT<>(ReturnT.FAIL_CODE, "" + e);
                }
                idleBeatResultSB.append((idleBeatResultSB.length() > 0) ? "

    "
    : "") .append(I18nUtil.getString("jobconf_idleBeat") + ":") .append("
    address:"
    ).append(address) .append("
    code:"
    ).append(idleBeatResult.getCode()) .append("
    msg:"
    ).append(idleBeatResult.getMsg()); // beat success if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) { idleBeatResult.setMsg(idleBeatResultSB.toString()); idleBeatResult.setContent(address); return idleBeatResult; } } return new ReturnT<>(ReturnT.FAIL_CODE, idleBeatResultSB.toString()); } }
  • 相关阅读:
    计算机组成原理浮点数表示
    8位ADC是256还是255?
    springboot simple (6) springboot thrift
    剑指 Offer II 114+115+116+117+LC827
    r语言plot函数
    【架构师视角系列】QConfig配置中心系列之架构设计(一)
    CocosCreator-3.6 三步解决2D碰撞监听
    如何做好供应商绩效管理?
    金堂县中医医院二期扩建项目建设进入收尾阶段
    东北大学acm暑期夏令营指针与引用初步
  • 原文地址:https://blog.csdn.net/Azhuzhu_chaste/article/details/126940104