分布式定时任务系列5:XXL-job中blockingQueue的应用
分布式定时任务系列6:XXL-job触发日志过大引发的CPU告警
好几个月前就打算分析一下XXL-job路由策略的源码,所以有了XXL-job路由策略。不过当时偷懒,只从官网上把介绍贴出来了:
路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;LAST(最后一个):固定选择最后一个机器;ROUND(轮询):;RANDOM(随机):随机选择在线的机器;CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
一般提到路由,可能更多的是理解为对请求做转发时的路由匹配:比如nginx的Location路由,或者SpringCloud组件的Gateway网关上请求Predicate的URL路由匹配。
不过这里说的路由,其实指的在集群条件下对执行器进行的路由选择,是一种负载均衡策略。所以这里假设了一个场景就是,在分布式环境下,有多个执行器组成的集群。这里回顾一下xxl-rpc部署示意图:

执行器集群部署关于调度器集群部署不在此范围暂不讨论,后面会单开一节具体讨论如何集群部署,达到高性能、高可用目的!这里继续引用XXL-job源码分析之任务触发里面关于代码执行的流程
可以看到路由策略的执行代码类路径在:com.xxl.job.admin.core.trigger.XxlJobTrigger ,方法路径在:com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger:
executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
XXL-job定义了策略枚举:
- public enum ExecutorRouteStrategyEnum {
-
- /** FIRST(第一个):固定选择第一个机器; */
- FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
- /** (最后一个):固定选择最后一个机器; */
- LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
- /** (轮询):; */
- ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
- /** (随机):随机选择在线的机器; */
- RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
- /** (一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 */
- CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
- /** (最不经常使用):使用频率最低的机器优先被选举; */
- LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
- /** (最近最久未使用):最久未使用的机器优先被选举; */
- LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
- /** (故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; */
- FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
- /** (忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; */
- BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
- /** (分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务; */
- SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
-
- ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
- this.title = title;
- this.router = router;
- }
-
- private String title;
- private ExecutorRouter router;
-
- public String getTitle() {
- return title;
- }
- public ExecutorRouter getRouter() {
- return router;
- }
-
- public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
- if (name != null) {
- for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
- if (item.name().equals(name)) {
- return item;
- }
- }
- }
- return defaultItem;
- }
-
- }
其中枚举里面有一个属性router,真正的路由策略实现都在这个接口:com.xxl.job.admin.core.route.ExecutorRouter。
看一看路由接口定义代码:
- public abstract class ExecutorRouter {
- protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
-
- /**
- * route address
- *
- * @param addressList
- * @return ReturnT.content=address
- */
- public abstract ReturnT
route(TriggerParam triggerParam, List addressList) ; -
- }
而上面的各种策略都实现了这个接口:

这种是典型的策略模式应用,这里也可以看出好的代码通过设计模式可以很方便的做到扩展!
对于这些路由策略实现,从简单到复杂一个个的来解析。
FIRST(第一个)此策略的定义是:固定选择第一个机器!意思就是不论执行器有多少个,始终选择执行器列表的第一个进行任务执行。
这个策略的实现也相当简单:
- public class ExecutorRouteFirst extends ExecutorRouter {
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - return new ReturnT
(addressList.get(0)); - }
-
- }
这个代码里面就是固定从addressList里面get第一个执行器
此策略的定义是:固定选择最后一个机器!意思就是不论执行器有多少个,始终选择执行器列表的最后一个进行任务执行。
这个策略的实现也相当简单:
- public class ExecutorRouteLast extends ExecutorRouter {
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - return new ReturnT
(addressList.get(addressList.size()-1)); - }
-
- }
这个代码里面就是固定从addressList里面get最后一个个执行器
此策略的定义是:意思就是不论执行器有多少个,从执行器列表逐个选择进行任务执行。
这个策略的实现如下:
- public class ExecutorRouteRound extends ExecutorRouter {
-
- private static ConcurrentMap
routeCountEachJob = new ConcurrentHashMap<>(); - private static long CACHE_VALID_TIME = 0;
-
- private static int count(int jobId) {
- // cache clear
- if (System.currentTimeMillis() > CACHE_VALID_TIME) {
- routeCountEachJob.clear();
- CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
- }
-
- AtomicInteger count = routeCountEachJob.get(jobId);
- if (count == null || count.get() > 1000000) {
- // 初始化时主动Random一次,缓解首次压力
- count = new AtomicInteger(new Random().nextInt(100));
- } else {
- // count++
- count.addAndGet(1);
- }
- routeCountEachJob.put(jobId, count);
- return count.get();
- }
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
- return new ReturnT
(address); - }
-
- }
这个代码稍微复杂一点,为了实现轮询的效果在内存中声明了一个Map来来进行计数,其中的key为任务jobId,value为每个任务的调用次数:
这样文字可能理解起来还是不太直接,其实就是类似如下的示例:

此策略的定义是:意思就是不论执行器有多少个,从执行器列表随机选择在线的机器。
这个策略的实现也比较直观:
- public class ExecutorRouteRandom extends ExecutorRouter {
-
- private static Random localRandom = new Random();
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - String address = addressList.get(localRandom.nextInt(addressList.size()));
- return new ReturnT
(address); - }
-
- }
这个代码里面就是随机从addressList里面get一个执行器:
localRandom.nextInt(addressList.size())
此策略的定义是:意思就是不论执行器有多少个,使用频率最低的机器优先被选择出来进行任务执行。
这个策略的实现如下:
- public class ExecutorRouteLFU extends ExecutorRouter {
-
- // 任务调用计算器,其中key为jobId-任务ID,value为HashMap:记录每个实例的调用次数
- private static ConcurrentMap
> jobLfuMap = new ConcurrentHashMap>(); - private static long CACHE_VALID_TIME = 0;
-
- public String route(int jobId, List
addressList) { -
- // 缓存1天(24小时),然后重新计数
- // cache clear
- if (System.currentTimeMillis() > CACHE_VALID_TIME) {
- jobLfuMap.clear();
- CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
- }
-
- // 初始化
- // lfu item init
- HashMap
lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList; - if (lfuItemMap == null) {
- lfuItemMap = new HashMap
(); - jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖
- }
-
- // put new
- for (String address: addressList) {
- if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
- lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力
- }
- }
- // 这里有一个删除动作,其实是因为实例可能动态上下线,对于下线的节点需要排除
- // remove old
- List
delKeys = new ArrayList<>(); - for (String existKey: lfuItemMap.keySet()) {
- if (!addressList.contains(existKey)) {
- delKeys.add(existKey);
- }
- }
- // 移除下线节点,尽量防止调度到下线的节点上导致失败
- if (delKeys.size() > 0) {
- for (String delKey: delKeys) {
- lfuItemMap.remove(delKey);
- }
- }
-
- // 进行调用次数排序
- // load least userd count address
- List
> lfuItemList = new ArrayList>(lfuItemMap.entrySet()); - Collections.sort(lfuItemList, new Comparator
>() { - @Override
- public int compare(Map.Entry
o1, Map.Entry o2) { - return o1.getValue().compareTo(o2.getValue());
- }
- });
-
- // 调用次数+1
- Map.Entry
addressItem = lfuItemList.get(0); - String minAddress = addressItem.getKey();
- addressItem.setValue(addressItem.getValue() + 1);
-
- return addressItem.getKey();
- }
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - String address = route(triggerParam.getJobId(), addressList);
- return new ReturnT
(address); - }
-
- }
这个代码比较复杂一点, 不过如果对比缓存的淘汰策略的话,这个其实就是所谓的"LFU":
LFU(The Least Frequently Used)最近不多使用算法,与LRU的区别在于LRU是以时间衡量,LFU是以时间段内的次数
算法:若是一个数据在必定时间内被访问的次数很低,那么被认为在将来被访问的几率也是最低的,当规定空间用尽且需要放入新数据的时候,会优先淘汰时间段内访问次数最低的数据。
优势:LFU也能够有效的保护缓存,相对场景来说,比LRU有更好的缓存命中率。由于是以次数为基准,因此更加准确,天然能有效的保证和提升命中率。
缺点:由于LFU须要记录数据的访问频率,所以需要额外的空间;当访问模式改变的时候,算法命中率会急剧降低,这也是他最大弊端
所以这个策略里面为了实现LFU的效果在内存中声明了一个Map来来进行计数,其中的key为任务jobId,value为每个任务的调用次数:
此策略的定义是:意思就是不论执行器有多少个,最久未使用的机器优先被选举。
这个策略的实现如下:
-
- public class ExecutorRouteLRU extends ExecutorRouter {
-
- private static ConcurrentMap
> jobLRUMap = new ConcurrentHashMap>(); - private static long CACHE_VALID_TIME = 0;
-
- public String route(int jobId, List
addressList) { -
- // 缓存1天(24小时),然后重新计数
- // cache clear
- if (System.currentTimeMillis() > CACHE_VALID_TIME) {
- jobLRUMap.clear();
- CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
- }
-
- // init lru
- LinkedHashMap
lruItem = jobLRUMap.get(jobId); - if (lruItem == null) {
- /**
- * LinkedHashMap
- * a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
- * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
- */
- lruItem = new LinkedHashMap
(16, 0.75f, true); - jobLRUMap.putIfAbsent(jobId, lruItem);
- }
- // 新加入的节点处理:添加到lru中进行统计
- // put new
- for (String address: addressList) {
- if (!lruItem.containsKey(address)) {
- lruItem.put(address, address);
- }
- }
- // 这里有一个删除动作,其实是因为实例可能动态上下线,对于下线的节点需要排除
- // remove old
- List
delKeys = new ArrayList<>(); - for (String existKey: lruItem.keySet()) {
- if (!addressList.contains(existKey)) {
- delKeys.add(existKey);
- }
- }
- if (delKeys.size() > 0) {
- for (String delKey: delKeys) {
- lruItem.remove(delKey);
- }
- }
- // 排序最后一个节点
- // load
- String eldestKey = lruItem.entrySet().iterator().next().getKey();
- String eldestValue = lruItem.get(eldestKey);
- return eldestValue;
- }
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - String address = route(triggerParam.getJobId(), addressList);
- return new ReturnT
(address); - }
-
- }
这个代码比较复杂一点, 不过如果对比缓存的淘汰策略的话,这个其实就是所谓的"LRU":
LRU(The Least Recently Used)最近最久未使用算法。相比于FIFO算法智能些。
算法:若是一个数据最近不多被访问到,那么被认为在将来被访问的几率也是最低的,当规定空间用尽且需要放入新数据的时候,会优先淘汰最久未被访问的数据。
优势:LRU能够有效的对访问比较频繁的数据进行保护,也就是针对热点数据的命中率提升有明显的效果。
缺点:对于周期性、偶发性的访问数据,有大几率可能形成缓存污染,也就是置换出去了热点数据,把这些偶发性数据留下了,从而致使LRU的数据命中率急剧降低。
所以这个策略里面为了实现LFU的效果在内存中声明了一个Map来来进行计数,其中的key为任务jobId,value为每个任务的调用次数:
- /**
- * Constructs an empty LinkedHashMap instance with the
- * specified initial capacity, load factor and ordering mode.
- *
- * @param initialCapacity the initial capacity
- * @param loadFactor the load factor
- * @param accessOrder the ordering mode - true for
- * access-order, false for insertion-order
- * @throws IllegalArgumentException if the initial capacity is negative
- * or the load factor is nonpositive
- */
- public LinkedHashMap(int initialCapacity,
- float loadFactor,
- boolean accessOrder) {
- super(initialCapacity, loadFactor);
- this.accessOrder = accessOrder;
- }
a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
LinkedHashMap中'accessOrder‘字段的用途是什么?
LinkedHashMap是Java中的一个类,它是HashMap的一个子类,具有HashMap的所有特性,并且还保持了插入顺序或访问顺序的特性。
'accessOrder'字段是LinkedHashMap类中的一个布尔类型的属性,用于指定迭代顺序是否基于访问顺序。当accessOrder为true时,表示迭代顺序将基于最近访问顺序,即最近访问的元素将排在迭代顺序的末尾;当accessOrder为false时,表示迭代顺序将基于插入顺序,即元素将按照插入的顺序进行迭代。
使用accessOrder字段可以方便地实现LRU(Least Recently Used,最近最少使用)缓存淘汰算法。通过将accessOrder设置为true,当访问某个元素时,该元素会被移到链表的末尾,这样在需要淘汰元素时,只需要移除链表头部的元素即可。
LinkedHashMap的应用场景包括但不限于:
- 缓存系统:通过设置accessOrder为true,可以实现基于访问顺序的缓存淘汰策略。
- LRU缓存:通过继承LinkedHashMap并重写removeEldestEntry方法,可以实现固定大小的LRU缓存。
- 记录访问顺序:当需要按照访问顺序记录某些数据时,可以使用LinkedHashMap。
此策略的定义是:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度。
这个策略的实现如下:
- public class ExecutorRouteFailover extends ExecutorRouter {
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { -
- StringBuffer beatResultSB = new StringBuffer();
- for (String address : addressList) {
- // beat
- ReturnT
beatResult = null; - try {
- ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
- beatResult = executorBiz.beat();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- beatResult = new ReturnT
(ReturnT.FAIL_CODE, ""+e ); - }
- beatResultSB.append( (beatResultSB.length()>0)?"
":"") - .append(I18nUtil.getString("jobconf_beat") + ":")
- .append("
address:").append(address) - .append("
code:").append(beatResult.getCode()) - .append("
msg:").append(beatResult.getMsg()); -
- // beat success
- if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
-
- beatResult.setMsg(beatResultSB.toString());
- beatResult.setContent(address);
- return beatResult;
- }
- }
- return new ReturnT
(ReturnT.FAIL_CODE, beatResultSB.toString()); -
- }
- }
这个策略的实现也比较直观,根据注册的实例列表依次发起心跳检测,如果成功,则选取为执行节点!代码并不复杂,不过可能对FAILOVER(故障转移)这个术语所震撼,觉得很高大上。
对于FAILOVER这种故障处理策略来说,不同的框架或者场景实现不同,难易程度也不同,而且也不是所有的系统/接口适合故障转移。比如对一个有超时机制的微服务架构来说:
如果链路比较多,一个业务请求需要经过A->B->C3个服务,每个服务有2个节点。假设在A服务调用B的时候,存在网络问题,A的实例A1失败,这里转移到A2成功;A->B的时候,B1失败,B2成功;同理B->C,C1失败,C2成功,虽然最终请求成功,但是整体耗时会增大一倍,早已经进过了网关(整体)响应时长导致timeout了,所以有些专题的FAILOVER并不一定是有益甚至有害的(重试也增加了服务调用次数,服务的压力)。关于这一块会后面单独开一节服务故障模式的讨论
此策略的定义是:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度。
这个策略的实现如下:
- public class ExecutorRouteBusyover extends ExecutorRouter {
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - StringBuffer idleBeatResultSB = new StringBuffer();
- for (String address : addressList) {
- // beat
- ReturnT
idleBeatResult = null; - try {
- ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
- idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- idleBeatResult = new ReturnT
(ReturnT.FAIL_CODE, ""+e ); - }
- idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"
":"") - .append(I18nUtil.getString("jobconf_idleBeat") + ":")
- .append("
address:").append(address) - .append("
code:").append(idleBeatResult.getCode()) - .append("
msg:").append(idleBeatResult.getMsg()); -
- // beat success
- if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
- idleBeatResult.setMsg(idleBeatResultSB.toString());
- idleBeatResult.setContent(address);
- return idleBeatResult;
- }
- }
-
- return new ReturnT
(ReturnT.FAIL_CODE, idleBeatResultSB.toString()); - }
-
- }
这个策略的实现也比较直观,根据注册的实例列表依次发起空闲检测,如果成功,则选取为执行节点!代码并不复杂,不过可能需要联合起前面XXL-rpc的章节来配合理解
此策略的定义是:每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
这个策略的实现如下:
- /**
- * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
- * a、virtual node:解决不均衡问题
- * b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
- * Created by xuxueli on 17/3/10.
- */
- public class ExecutorRouteConsistentHash extends ExecutorRouter {
-
- private static int VIRTUAL_NODE_NUM = 100;
-
- /**
- * get hash code on 2^32 ring (md5散列的方式计算hash值)
- * @param key
- * @return
- */
- private static long hash(String key) {
-
- // md5 byte
- MessageDigest md5;
- try {
- md5 = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException("MD5 not supported", e);
- }
- md5.reset();
- byte[] keyBytes = null;
- try {
- keyBytes = key.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("Unknown string :" + key, e);
- }
-
- md5.update(keyBytes);
- byte[] digest = md5.digest();
-
- // hash code, Truncate to 32-bits
- long hashCode = ((long) (digest[3] & 0xFF) << 24)
- | ((long) (digest[2] & 0xFF) << 16)
- | ((long) (digest[1] & 0xFF) << 8)
- | (digest[0] & 0xFF);
-
- long truncateHashCode = hashCode & 0xffffffffL;
- return truncateHashCode;
- }
-
- public String hashJob(int jobId, List
addressList) { -
- // ------A1------A2-------A3------
- // -----------J1------------------
- TreeMap
addressRing = new TreeMap(); - for (String address: addressList) {
- for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
- long addressHash = hash("SHARD-" + address + "-NODE-" + i);
- addressRing.put(addressHash, address);
- }
- }
-
- long jobHash = hash(String.valueOf(jobId));
- SortedMap
lastRing = addressRing.tailMap(jobHash); - if (!lastRing.isEmpty()) {
- return lastRing.get(lastRing.firstKey());
- }
- return addressRing.firstEntry().getValue();
- }
-
- @Override
- public ReturnT
route(TriggerParam triggerParam, List addressList) { - String address = hashJob(triggerParam.getJobId(), addressList);
- return new ReturnT
(address); - }
-
- }
这个策略的实现也比较直观,这里就不展开讨论了,有兴趣的可以在评论区晒出理解分享给大家!
上面介绍了各种路由策略的实现,关于这里面的路由策略的选择就不过多讨论了,建议默认情况采用下轮询!
| 序号 | 名称 | 适用场景 |
| 1 | FIRST(第一个) |
|
| 2 | LAST(最后一个) | 这个策略本质上跟FIRST没有区别... |
| 3 | ROUND(轮询) |
|
| 4 | RANDOM(随机) |
|
| 5 | LEAST_FREQUENTLY_USED(最不经常使用) | LFU其实是为了解决多节点利用率的问题,跟下面的LRU类似,不过由于其实现是依赖本地JVM里面的内存操作,可能对CPU占用会高一些,无特殊需求也非首选 |
| 6 | LEAST_RECENTLY_USED(最近最久未使用) | 同LFU |
| 7 | FAILOVER(故障转移) |
|
| 8 | BUSYOVER(忙碌转移) | 这个策略本质上跟FAILOVER没有区别,,只不过它可以根据执行器自身的执行情况反馈的结果进行适当的调整(idleBeat),后面会从注册发现来探讨这个问题 |
| 9 | CONSISTENT_HASH(一致性HASH) | 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;(可以自行看下一致性HASH) |