• 分布式任务调度Schedulerx2.0 Map模型作业执行流程


    Schedulerx2.0提供了多种分布式编程模型,下面以一个Map模型的作业来说明它的执行流程。

    一、例子

    一个Map作业的实例

    1. @Component
    2. public class TestMapJobProcessor extends MapJobProcessor {
    3. @Override
    4. public ProcessResult process(JobContext context) throws Exception {
    5. String taskName = context.getTaskName();
    6. int dispatchNum = 50;
    7. if (isRootTask(context)) {
    8. System.out.println("start root task");
    9. List msgList = Lists.newArrayList();
    10. for (int i = 0; i <= dispatchNum; i++) {
    11. msgList.add("msg_" + i);
    12. }
    13. return map(msgList, "Level1Dispatch");
    14. } else if (taskName.equals("Level1Dispatch")) {
    15. String task = (String)context.getTask();
    16. System.out.println(task);
    17. return new ProcessResult(true);
    18. }
    19. return new ProcessResult(false);
    20. }
    21. }

    一个Worker在接收到任务后开始执行process函数,刚开始判断是isRootTask类型的作业,然后生产多个子任务,进入map流程。

    二、map流程

    MapJobProcessor#map的代码如下:
    1. public ProcessResult map(List taskList, String taskName) {
    2. ProcessResult result = new ProcessResult(false);
    3. JobContext context = ContainerFactory.getContainerPool().getContext();
    4. //查找taskMaster
    5. ActorSelection masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
    6. if (masterAkkaSelection == null) {
    7. ......
    8. result.setResult(errMsg);
    9. return result;
    10. } else if (CollectionUtils.isEmpty(taskList)) {
    11. return result;
    12. } else {
    13. //将任务分批发送,批次大小为batchSize
    14. int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
    15. int size = taskList.size();
    16. int quotient = size / batchSize;
    17. int remainder = size % batchSize;
    18. int batchNumber = remainder > 0 ? quotient + 1 : quotient;
    19. List builders = Lists.newArrayList();
    20. int position;
    21. for(position = 0; position < batchNumber; ++position) {
    22. builders.add(WorkerMapTaskRequest.newBuilder());
    23. }
    24. position = 0;
    25. int maxTaskBodySize = ConfigUtil.getWorkerConfig().getInt("task.body.size.max", 65536);
    26. try {
    27. Iterator var14 = taskList.iterator();
    28. while(var14.hasNext()) {
    29. Object task = var14.next();
    30. this.checkTaskObject(task);
    31. int batchIdx = position++ / batchSize;
    32. byte[] taskBody = HessianUtil.toBytes(task);
    33. ((Builder)builders.get(batchIdx)).addTaskBody(ByteString.copyFrom(taskBody));
    34. }
    35. position = 0;
    36. var14 = builders.iterator();
    37. while(var14.hasNext()) {
    38. Builder builder = (Builder)var14.next();
    39. builder.setJobId(context.getJobId());
    40. builder.setJobInstanceId(context.getJobInstanceId());
    41. builder.setTaskId(context.getTaskId());
    42. builder.setTaskName(taskName);
    43. WorkerMapTaskResponse response = null;
    44. byte retryCount = 0;
    45. try {
    46. TaskMaster taskMaster = TaskMasterPool.INSTANCE.get(context.getJobInstanceId());
    47. //如果TaskMaster在本地,直接由本地处理
    48. if (taskMaster != null && taskMaster instanceof MapTaskMaster) {
    49. //处理map子任务
    50. response = this.handleMapTask(taskMaster, builder.build());
    51. } else {
    52. #任务转发
    53. response = (WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, builder.build(), 30L);
    54. }
    55. } catch (TimeoutException var19) {
    56. ......
    57. }
    58. if (!response.getSuccess()) {
    59. LOGGER.error(response.getMessage());
    60. this.logCollector.collect(context.getUniqueId(), response.getMessage());
    61. result.setResult(response.getMessage());
    62. return result;
    63. }
    64. builders.set(position++, (Object)null);
    65. if (response.hasOverload() && response.getOverload()) {
    66. LOGGER.warn("Task Master is busy, sleeping a while {}s...", new Object[]{10});
    67. Thread.sleep(10000L);
    68. }
    69. }
    70. result.setStatus(true);
    71. } catch (Throwable var20) {
    72. ......
    73. }
    74. return result;
    75. }
    76. }

    map函数先将任务分批,然后依次处理。

    函数 MapJobProcessor#handleMapTask

    1. private WorkerMapTaskResponse handleMapTask(TaskMaster taskMaster, WorkerMapTaskRequest request) throws Exception {
    2. WorkerMapTaskResponse response = null;
    3. try {
    4. long jobInstanceId = request.getJobInstanceId();
    5. if (taskMaster != null) {
    6. if (!(taskMaster instanceof MapTaskMaster)) {
    7. ......
    8. } else {
    9. try {
    10. long startTime = System.currentTimeMillis();
    11. //调用MapTaskMaster的map函数
    12. boolean overload = ((MapTaskMaster)taskMaster).map(request.getTaskBodyList(), request.getTaskName());
    13. response = WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(overload).build();
    14. } catch (Exception var9) {
    15. ......
    16. }
    17. }
    18. } else {
    19. ......
    20. }
    21. } catch (Throwable var10) {
    22. ......
    23. }
    24. return response;
    25. }
    交给MapTaskMaster的map函数处理
    MapTaskMaster#map
    1. public boolean map(List taskList, String taskName) throws Exception {
    2. this.initTaskProgress(taskName, taskList.size());
    3. Iterator var3 = taskList.iterator();
    4. while(var3.hasNext()) {
    5. ByteString taskBody = (ByteString)var3.next();
    6. //生成任务
    7. MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
    8. //将任务写入队列taskBlockingQueue
    9. this.taskBlockingQueue.submitRequest(startContainerRequest);
    10. }
    11. return this.machineOverload();
    12. }

    map函数将任务写入队列,然后异步从队列taskBlockingQueue中取出消息。

    三、ParallelTaskMater

    ParallelTaskMater是类MapTaskMaster的实现类,定义如下:

    1. public class ParallelTaskMater extends MapTaskMaster {
    2. private static final Logger LOGGER = LogFactory.getLogger(ParallelTaskMater.class);
    3. private LogCollector logCollector = LogCollectorFactory.get();
    4. private static final Integer BATCH_SIZE = 256;
    5. public ParallelTaskMater(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
    6. super(jobInstanceInfo, actorContext);
    7. this.taskPersistence = new ServerTaskPersistence(jobInstanceInfo.getGroupId());
    8. long jobInstanceId = jobInstanceInfo.getJobInstanceId();
    9. this.taskStatusReqQueue = new ReqQueue(jobInstanceId, 1024);
    10. this.taskStatusReqBatchHandler = new TMStatusReqHandler(jobInstanceId, 1, 1, BATCH_SIZE * 2 * jobInstanceInfo.getAllWorkers().size(), this.taskStatusReqQueue);
    11. //存储待处理的任务
    12. this.taskBlockingQueue = new ReqQueue(jobInstanceId, BATCH_SIZE * 4);
    13. if (jobInstanceInfo.getXattrs() != null) {
    14. this.xAttrs = (MapTaskXAttrs)JsonUtil.fromJson(jobInstanceInfo.getXattrs(), MapTaskXAttrs.class);
    15. }
    16. if (this.xAttrs != null && this.xAttrs.getTaskDispatchMode().equals(TaskDispatchMode.PULL.getValue())) {
    17. //定时分发的处理函数
    18. this.taskDispatchReqHandler = new TaskPullReqHandler(jobInstanceId, 1, 2, BATCH_SIZE * jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue);
    19. } else {
    20. this.taskDispatchReqHandler = new TaskPushReqHandler(jobInstanceId, 1, 2, BATCH_SIZE * jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue, BATCH_SIZE);
    21. }
    22. }
    23. ......
    24. }

    taskDispatchReqHandler 是一个处理任务分发的函数,在MapTaskMaster的启动函数startBatchHandler会启动taskDispatchReqHandler。

    TaskPushReqHandler继承自TaskDispatchReqHandler,
    TaskDispatchReqHandler继承自BaseReqHandler,BaseReqHandler的启动函数如下:

    BaseReqHandler#start
    1. public void start() {
    2. ......
    3. this.batchRetrieveThread = new Thread(new Runnable() {
    4. public void run() {
    5. while(true) {
    6. try {
    7. if (!Thread.currentThread().isInterrupted()) {
    8. List reqs = BaseReqHandler.this.asyncHandleReqs();
    9. BaseReqHandler.LOGGER.debug("jobInstanceId={}, batch retrieve reqs, size:{}, remain size:{}, batchSize:{}", new Object[]{BaseReqHandler.this.jobInstanceId, reqs.size(), BaseReqHandler.this.reqsQueue.size(), BaseReqHandler.this.batchSize});
    10. if ((float)reqs.size() >= (float)BaseReqHandler.this.batchSize * 0.8F) {
    11. continue;
    12. }
    13. if (reqs.isEmpty()) {
    14. Thread.sleep(BaseReqHandler.this.emptySleepMs);
    15. continue;
    16. }
    17. Thread.sleep(BaseReqHandler.this.defaultSleepMs);
    18. continue;
    19. }
    20. } catch (InterruptedException var2) {
    21. ;
    22. } catch (Throwable var3) {
    23. BaseReqHandler.LOGGER.error(var3);
    24. }
    25. return;
    26. }
    27. }
    28. }, this.batchRetrieveThreadName + this.jobInstanceId);
    29. this.batchRetrieveThread.start();
    30. }

    定时执行函数asyncHandleReqs

    1. private synchronized List asyncHandleReqs() {
    2. List reqs = this.reqsQueue.retrieveRequests(this.batchSize);
    3. if (!reqs.isEmpty()) {
    4. this.activeRunnableNum.incrementAndGet();
    5. this.process(this.jobInstanceId, reqs);
    6. }
    7. return reqs;
    8. }

    从队列中获取任务,然后调用函数process来处理。

    TaskPushReqHandler#process
    1. public void process(long jobInstanceId, List reqs, String workerAddr) {
    2. this.batchProcessSvc.submit(new TaskPushReqHandler.BatchTasksDispatchRunnable(jobInstanceId, reqs));
    3. }
    4. private class BatchTasksDispatchRunnable implements Runnable {
    5. private long jobInstanceId;
    6. private List reqs;
    7. BatchTasksDispatchRunnable(long jobInstanceId, List reqs) {
    8. this.jobInstanceId = jobInstanceId;
    9. this.reqs = reqs;
    10. }
    11. public void run() {
    12. try {
    13. ((MapTaskMaster)TaskPushReqHandler.this.taskMasterPool.get(this.jobInstanceId)).batchDispatchTasks(this.reqs);
    14. } catch (Throwable var6) {
    15. TaskPushReqHandler.LOGGER.error(var6);
    16. } finally {
    17. TaskPushReqHandler.this.activeRunnableNum.decrementAndGet();
    18. }
    19. }
    20. }
    调用函数batchDispatchTasks批量转发任务
    MapTaskMaster#batchDispatchTasks
    1. public void batchDispatchTasks(List masterStartContainerRequests, String remoteWorker) {
    2. //分配正常的任务
    3. Map> worker2ReqsWithNormal = Maps.newHashMap();
    4. //分配执行失败的任务
    5. Map> worker2ReqsWithFailover = Maps.newHashMap();
    6. this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, remoteWorker);
    7. Iterator var5 = worker2ReqsWithNormal.entrySet().iterator();
    8. Entry entry;
    9. while(var5.hasNext()) {
    10. entry = (Entry)var5.next();
    11. //作业转发
    12. this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
    13. }
    14. var5 = worker2ReqsWithFailover.entrySet().iterator();
    15. while(var5.hasNext()) {
    16. entry = (Entry)var5.next();
    17. //作业转发
    18. this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
    19. }
    20. }

    batchDispatchTasks将作业分为正常作业和失败的作业,然后调用函数batchHandleContainers进行分发。

    1. private void batchHandleContainers(final String workerIdAddr, final List reqs, boolean isFailover, TaskDispatchMode dispatchMode) {
    2. ......
    3. try {
    4. //将作业状态写入h2数据库
    5. this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
    6. //将作业推送给其它worker
    7. if (dispatchMode.equals(TaskDispatchMode.PUSH)) {
    8. final long startTime = System.currentTimeMillis();
    9. ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
    10. MasterBatchStartContainersRequest request = MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
    11. Timeout timeout = new Timeout(Duration.create(15L, TimeUnit.SECONDS));
    12. //发送给对应的worker
    13. Future future = Patterns.ask(selection, request, timeout);
    14. future.onSuccess(new OnSuccess() {
    15. public void onSuccess(Object obj) throws Throwable {
    16. MasterBatchStartContainersResponse response = (MasterBatchStartContainersResponse)obj;
    17. if (response.getSuccess()) {
    18. ......
    19. } else {
    20. Iterator var3 = reqs.iterator();
    21. while(var3.hasNext()) {
    22. #更新数据库中task的状态为failed
    23. ContainerReportTaskStatusRequest faileStatusRequest = ContainerReportTaskStatusRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
    24. MapTaskMaster.this.updateTaskStatus(faileStatusRequest);
    25. }
    26. }
    27. }
    28. }, this.futureExecutor);
    29. future.onFailure(new OnFailure() {
    30. public void onFailure(Throwable e) throws Throwable {
    31. Iterator var3;
    32. MasterStartContainerRequest req;
    33. if (e instanceof TimeoutException) {
    34. List taskIds = Lists.newArrayList();
    35. var3 = reqs.iterator();
    36. while(var3.hasNext()) {
    37. req = (MasterStartContainerRequest)var3.next();
    38. taskIds.add(req.getTaskId());
    39. }
    40. try {
    41. //task发送超时,更新状态为INIT
    42. int affectCnt = MapTaskMaster.this.taskPersistence.updateTaskStatus(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), taskIds, TaskStatus.INIT, workerId, workerAddr);
    43. ((WorkerProgressCounter)MapTaskMaster.this.workerProgressMap.get(workerAddr)).decrementRunning(affectCnt);
    44. } catch (Exception var6) {
    45. MapTaskMaster.LOGGER.error("jobInstanceId={}, timeout return init error", new Object[]{MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()});
    46. MapTaskMaster.this.updateNewInstanceStatus(MapTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, "timeout dispatch return init error");
    47. }
    48. } else {
    49. String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(MapTaskMaster.this.jobInstanceInfo.getJobId(), MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
    50. {MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size(), e});
    51. MapTaskMaster.this.logCollector.collect(uniqueIdWithoutTask, "map task dispatch fail.", e);
    52. var3 = reqs.iterator();
    53. //其它错误,task状态更新为failed
    54. while(var3.hasNext()) {
    55. req = (MasterStartContainerRequest)var3.next();
    56. ((TaskProgressCounter)MapTaskMaster.this.taskProgressMap.get(req.getTaskName())).incrementFailed();
    57. ((WorkerProgressCounter)MapTaskMaster.this.workerProgressMap.get(workerAddr)).incrementFailed();
    58. ContainerReportTaskStatusRequest faileReq = ContainerReportTaskStatusRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
    59. MapTaskMaster.this.updateTaskStatus(faileReq);
    60. }
    61. }
    62. }
    63. }, this.futureExecutor);
    64. }
    65. } catch (Throwable var13) {
    66. ......
    67. }
    68. }
    69. 在转发任务时,如果发送超时,会将任务的状态更新为INIT。MapTaskMaster会启动一个定时任务将数据库中状态为INIT的task拉取出来重新发送。

      1. protected void init() {
      2. ......
      3. #从数据库中拉取状态为INIT的任务
      4. List taskInfos = MapTaskMaster.this.taskPersistence.pull(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), MapTaskMaster.this.pageSize);
      5. if (taskInfos.isEmpty()) {
      6. Thread.sleep(10000L);
      7. } else {
      8. Iterator var4 = taskInfos.iterator();
      9. while(var4.hasNext()) {
      10. TaskInfo taskInfo = (TaskInfo)var4.next();
      11. String taskName = taskInfo.getTaskName();
      12. #将任务重新写入到队列taskBlockingQueue中
      13. Builder builder = MapTaskMaster.this.convert2StartContainerRequestBuilder(MapTaskMaster.this.jobInstanceInfo, taskInfo.getTaskId(), taskInfo.getTaskName(), taskBody, true);
      14. MapTaskMaster.this.taskBlockingQueue.submitRequest(builder.build());
      15. }
      16. }
      17. }

    70. 相关阅读:
      《小程序从入门到入坑》框架语法
      VsCode + gdb + gdbserver远程调试arm嵌入式linux C/C++程序
      连接池-归还连接详解(下)
      纯css手写switch
      Linux常用命令——bzdiff命令
      100个python算法超详细讲解:多项式之和
      初学SpringMVC之 Ajax 篇
      [附源码]Python计算机毕业设计SSM乐跑(程序+LW)
      Linux环境安装开发grafana插件(一)试水
      【浮点数的存储】
    71. 原文地址:https://blog.csdn.net/bao2901203013/article/details/126652741