• 分布式任务调度Schedulerx2.0工作原理


    一、前言

    Schedulerx2.0是阿里巴巴开发的一个基于akka的分布式任务调度框架,提供分布式执行、多种任务类型、统一日志等功能,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。本文主要讲解schedulerx-worker的工作原理

    二、整体架构

    Schedulerx2.0是中心化的调度框架,包括Server和Worker。Server负责任务的触发和调度,通过分发引擎提交任务给Worker。Worker负责任务的执行。

    Worker分为TaskMaster, Container, Processor三层:

    分为TaskMaster, Container, Processor三层:

    • TaskMaster:类似于yarn的AppMaster,支持可扩展的分布式执行框架,进行整个jobInstance的生命周期管理、container的资源管理,同时还有failover等能力。默认实现StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
    • Container:执行业务逻辑的容器框架,支持线程/进程/docker/actor等。
    • Processor:业务逻辑框架,不同的processor表示不同的任务类型。

    三、任务执行流程-worker部分

    Server触发任务调度执行,提交任务给Worker,Worker通过JobInstanceActor接收Server提交的任务:

    1. public class JobInstanceActor extends UntypedActor {
    2. private TaskMasterPool masterPool;
    3. private LogCollector logCollector;
    4. private static final Logger LOGGER = LogFactory.getLogger(JobInstanceActor.class);
    5. public JobInstanceActor() {
    6. this.masterPool = TaskMasterPool.INSTANCE;
    7. this.logCollector = LogCollectorFactory.get();
    8. }
    9. public void onReceive(Object obj) throws Throwable {
    10. if (obj instanceof ServerSubmitJobInstanceRequest) {
    11. this.handleSubmitJobInstance((ServerSubmitJobInstanceRequest)obj);
    12. } else if (obj instanceof ServerKillJobInstanceRequest) {
    13. this.handleKillJobInstance((ServerKillJobInstanceRequest)obj);
    14. } else if (obj instanceof ServerRetryTasksRequest) {
    15. this.handleRetryTasks((ServerRetryTasksRequest)obj);
    16. } else if (obj instanceof ServerKillTaskRequest) {
    17. this.handleKillTask((ServerKillTaskRequest)obj);
    18. } else if (obj instanceof ServerCheckTaskMasterRequest) {
    19. this.handCheckTaskMaster((ServerCheckTaskMasterRequest)obj);
    20. } else if (obj instanceof MasterNotifyWorkerPullRequest) {
    21. this.handleInitPull((MasterNotifyWorkerPullRequest)obj);
    22. } else if (obj instanceof ServerThreadDumpRequest) {
    23. this.handleThreadDump((ServerThreadDumpRequest)obj);
    24. } else if (obj instanceof ServerPushLogConfigRequest) {
    25. this.handlePushLogConfig((ServerPushLogConfigRequest)obj);
    26. }
    27. }

    其中,任务执行的消息类是ServerSubmitJobInstanceRequest,Worker在接收到消息时,会交给函数handleSubmitJobInstance来处理:

    1. private void handleSubmitJobInstance(ServerSubmitJobInstanceRequest request) {
    2. ServerSubmitJobInstanceResponse response = null;
    3. //任务正在执行中,直接返回
    4. if (this.masterPool.contains(request.getJobInstanceId())) {
    5. this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), ClientLoggerMessage.appendMessage("server trigger client fail.", new String[]{errMsg}));
    6. response = ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage(errMsg).build();
    7. this.getSender().tell(response, this.getSelf());
    8. } else {
    9. response = ServerSubmitJobInstanceResponse.newBuilder().setSuccess(true).build();
    10. this.getSender().tell(response, this.getSelf());
    11. try {
    12. JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
    13. //根据任务的类型,创建TaskMaster对象
    14. TaskMaster taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
    15. this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
    16. //提交任务执行
    17. taskMaster.submitInstance(jobInstanceInfo);
    18. ......
    19. } catch (Throwable var5) {
    20. ......
    21. }
    22. }
    23. }

    handleSubmitJobInstance中会创建一个TaskMaster对象来执行任务,TaskMaster是一个抽象类,它有几个继承类,对应几个分布式任务的编程模型,本文以Map类型的任务为例子,说明任务执行流程。

    map模型作业提供了并行计算、内存网格、网格计算三种执行方式:

    • 并行计算:子任务300以下,有子任务列表。
    • 内存网格:子任务5W以下,无子任务列表,速度快。
    • 网格计算:子任务100W以下,无子任务列表。

    3.1 TaskMaster

    MapTaskMaster作为TaskMaster的继承类,定义如下:

    1. public abstract class MapTaskMaster extends TaskMaster {
    2. protected volatile int pageSize = ConfigUtil.getWorkerConfig().getInt("map.master.page.size", 100);
    3. protected volatile int queueSize = ConfigUtil.getWorkerConfig().getInt("map.master.queue.size", 10000);
    4. private volatile int dispatcherSize = ConfigUtil.getWorkerConfig().getInt("map.master.dispatcher.size", 5);
    5. protected ReqQueue taskStatusReqQueue;
    6. protected TMStatusReqHandler taskStatusReqBatchHandler;
    7. //存放map生成的任务
    8. protected ReqQueue taskBlockingQueue;
    9. //存放即将转发的任务,并将任务转发给各个worker执行
    10. protected TaskDispatchReqHandler taskDispatchReqHandler;
    11. private volatile String rootTaskResult;
    12. protected TaskPersistence taskPersistence;
    13. protected Map taskProgressMap = Maps.newConcurrentMap();
    14. protected Map workerProgressMap = Maps.newConcurrentMap();
    15. private Map taskResultMap = Maps.newHashMap();
    16. private Map taskStatusMap = Maps.newHashMap();
    17. ......
    18. }

    MapTaskMaster的submitInstance函数如下:

    1. public void submitInstance(JobInstanceInfo jobInstanceInfo) throws Exception {
    2. try {
    3. ......
    4. //各个对象的初始化,启动线程池中的任务
    5. this.startBatchHandler();
    6. this.createRootTask();
    7. this.init();
    8. } catch (Throwable var4) {
    9. ......
    10. }
    11. }

      1、startBatchHandler主要是做一些初始化,启动线程池中的任务定时执行。

      2、createRootTask主要是创建一个根任务,即由这个任务来map出来多个子任务。

    3、init主要是启动线程池中的任务定时执行。

    createRootTask的代码如下:

    1. protected void createRootTask() throws Exception {
    2. String taskName = "MAP_TASK_ROOT";
    3. ByteString taskBody = ByteString.copyFrom(HessianUtil.toBytes("MAP_TASK_ROOT"));
    4. //初始化任务计数器,当前job有一个task正在执行
    5. this.initTaskProgress(taskName, 1);
    6. //参数转换为MasterStartContainerRequest
    7. MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
    8. //将root任务分发给本地Worker执行
    9. this.batchDispatchTasks(Lists.newArrayList(new MasterStartContainerRequest[]{startContainerRequest}), this.getLocalWorkerIdAddr());
    10. }

    函数batchDispatchTasks将任务分配给各个Worker执行,将root类型的任务分配给本地的Worker执行:

    1. public void batchDispatchTasks(List masterStartContainerRequests, String remoteWorker) {
    2. Map> worker2ReqsWithNormal = Maps.newHashMap();
    3. Map> worker2ReqsWithFailover = Maps.newHashMap();
    4. this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, remoteWorker);
    5. Iterator var5 = worker2ReqsWithNormal.entrySet().iterator();
    6. Entry entry;
    7. while(var5.hasNext()) {
    8. entry = (Entry)var5.next();
    9. this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
    10. }
    11. var5 = worker2ReqsWithFailover.entrySet().iterator();
    12. while(var5.hasNext()) {
    13. entry = (Entry)var5.next();
    14. this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
    15. }
    16. }

    函数batchHandleContainers将任务写入本地数据库H2,同时将任务转发给对应的worker。

    1. private void batchHandleContainers(final String workerIdAddr, final List reqs, boolean isFailover, TaskDispatchMode dispatchMode) {
    2. ......
    3. try {
    4. //将task写入本地数据库H2,task的状态为running
    5. this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
    6. if (dispatchMode.equals(TaskDispatchMode.PUSH)) {
    7. //将task发送给对应的worker
    8. final long startTime = System.currentTimeMillis();
    9. ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
    10. MasterBatchStartContainersRequest request = MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
    11. Timeout timeout = new Timeout(Duration.create(15L, TimeUnit.SECONDS));
    12. Future future = Patterns.ask(selection, request, timeout);
    13. } catch (Throwable var13) {
    14. ......
    15. }
    16. }
    17. 3.2 Container

      Container是执行业务逻辑的容器框架,TaskMaster转发给worker的任务会Containe模块执行。其中ContainerRoutingActor是一个路由Actor,里面包含多个ContainerActor,ContainerRoutingActor将接收到的消息转发给其中的一个ContainerActor。ContainerActor的定义如下:

      1. public class ContainerActor extends UntypedActor {
      2. public void onReceive(Object obj) throws Throwable {
      3. if (obj instanceof MasterStartContainerRequest) {
      4. this.handleStartContainer((MasterStartContainerRequest)obj);
      5. } else if (obj instanceof MasterBatchStartContainersRequest) {
      6. this.handleBatchStartContainers((MasterBatchStartContainersRequest)obj);
      7. } else if (obj instanceof MasterKillContainerRequest) {
      8. this.handleKillContainer((MasterKillContainerRequest)obj);
      9. } else if (obj instanceof MasterDestroyContainerPoolRequest) {
      10. this.handleDestroyContainerPool((MasterDestroyContainerPoolRequest)obj);
      11. }
      12. }
      13. }

      当接收到MasterBatchStartContainersRequest类型的消息时,会调用函数startContainer来执行

      1. private String startContainer(MasterStartContainerRequest request) throws Exception {
      2. String uniqueId = IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId());
      3. JobContext context = ContanerUtil.convert2JobContext(request);
      4. Container container = ContainerFactory.create(context);
      5. if (container != null) {
      6. this.containerPool.submit(context.getJobId(), context.getJobInstanceId(), context.getTaskId(), container, consumerNum);
      7. }
      8. ......
      9. }

      最后通过submit提交任务、执行任务。

    18. 相关阅读:
      对graalvm、springboot3.0一些新特性的探究
      合工大《数字媒体技术》课程调研报告-视频伪造
      00-开源离线同步工具DataX3.0重磅详解!
      WPF散点图学习
      单词搜索系列问题
      多项式承诺:KZG
      Bard和ChatGPT的一些比较
      059:mapboxGL监听键盘事件,通过eastTo控制左右旋转
      33、连接器(connector)
      EDID详解
    19. 原文地址:https://blog.csdn.net/bao2901203013/article/details/126567295