Schedulerx2.0提供了多种分布式编程模型,下面以一个Map模型的作业来说明它的执行流程。
一个Map作业的实例
- @Component
- public class TestMapJobProcessor extends MapJobProcessor {
-
- @Override
- public ProcessResult process(JobContext context) throws Exception {
- String taskName = context.getTaskName();
- int dispatchNum = 50;
- if (isRootTask(context)) {
- System.out.println("start root task");
- List
msgList = Lists.newArrayList(); - for (int i = 0; i <= dispatchNum; i++) {
- msgList.add("msg_" + i);
- }
- return map(msgList, "Level1Dispatch");
- } else if (taskName.equals("Level1Dispatch")) {
- String task = (String)context.getTask();
- System.out.println(task);
- return new ProcessResult(true);
- }
-
- return new ProcessResult(false);
- }
-
- }
一个Worker在接收到任务后开始执行process函数,刚开始判断是isRootTask类型的作业,然后生产多个子任务,进入map流程。
MapJobProcessor#map的代码如下:
- public ProcessResult map(List extends Object> taskList, String taskName) {
- ProcessResult result = new ProcessResult(false);
- JobContext context = ContainerFactory.getContainerPool().getContext();
- //查找taskMaster
- ActorSelection masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
- if (masterAkkaSelection == null) {
- ......
- result.setResult(errMsg);
- return result;
- } else if (CollectionUtils.isEmpty(taskList)) {
-
- return result;
- } else {
- //将任务分批发送,批次大小为batchSize
- int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
- int size = taskList.size();
- int quotient = size / batchSize;
- int remainder = size % batchSize;
- int batchNumber = remainder > 0 ? quotient + 1 : quotient;
- List
builders = Lists.newArrayList(); -
- int position;
- for(position = 0; position < batchNumber; ++position) {
- builders.add(WorkerMapTaskRequest.newBuilder());
- }
-
- position = 0;
- int maxTaskBodySize = ConfigUtil.getWorkerConfig().getInt("task.body.size.max", 65536);
-
- try {
- Iterator var14 = taskList.iterator();
-
- while(var14.hasNext()) {
- Object task = var14.next();
- this.checkTaskObject(task);
- int batchIdx = position++ / batchSize;
- byte[] taskBody = HessianUtil.toBytes(task);
-
-
-
- ((Builder)builders.get(batchIdx)).addTaskBody(ByteString.copyFrom(taskBody));
- }
-
- position = 0;
- var14 = builders.iterator();
-
- while(var14.hasNext()) {
- Builder builder = (Builder)var14.next();
- builder.setJobId(context.getJobId());
- builder.setJobInstanceId(context.getJobInstanceId());
- builder.setTaskId(context.getTaskId());
- builder.setTaskName(taskName);
- WorkerMapTaskResponse response = null;
- byte retryCount = 0;
-
- try {
- TaskMaster taskMaster = TaskMasterPool.INSTANCE.get(context.getJobInstanceId());
- //如果TaskMaster在本地,直接由本地处理
- if (taskMaster != null && taskMaster instanceof MapTaskMaster) {
- //处理map子任务
- response = this.handleMapTask(taskMaster, builder.build());
- } else {
- #任务转发
- response = (WorkerMapTaskResponse)FutureUtils.awaitResult(masterAkkaSelection, builder.build(), 30L);
- }
- } catch (TimeoutException var19) {
- ......
-
- }
-
- if (!response.getSuccess()) {
- LOGGER.error(response.getMessage());
- this.logCollector.collect(context.getUniqueId(), response.getMessage());
- result.setResult(response.getMessage());
- return result;
- }
-
- builders.set(position++, (Object)null);
- if (response.hasOverload() && response.getOverload()) {
- LOGGER.warn("Task Master is busy, sleeping a while {}s...", new Object[]{10});
- Thread.sleep(10000L);
- }
- }
-
- result.setStatus(true);
- } catch (Throwable var20) {
- ......
- }
-
- return result;
- }
- }
map函数先将任务分批,然后依次处理。
函数 MapJobProcessor#handleMapTask
- private WorkerMapTaskResponse handleMapTask(TaskMaster taskMaster, WorkerMapTaskRequest request) throws Exception {
- WorkerMapTaskResponse response = null;
-
- try {
- long jobInstanceId = request.getJobInstanceId();
- if (taskMaster != null) {
- if (!(taskMaster instanceof MapTaskMaster)) {
- ......
- } else {
- try {
- long startTime = System.currentTimeMillis();
- //调用MapTaskMaster的map函数
- boolean overload = ((MapTaskMaster)taskMaster).map(request.getTaskBodyList(), request.getTaskName());
-
- response = WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(overload).build();
- } catch (Exception var9) {
- ......
- }
- }
- } else {
- ......
- }
- } catch (Throwable var10) {
- ......
- }
-
- return response;
- }
交给MapTaskMaster的map函数处理 MapTaskMaster#map
- public boolean map(List
taskList, String taskName) throws Exception { -
- this.initTaskProgress(taskName, taskList.size());
- Iterator var3 = taskList.iterator();
-
- while(var3.hasNext()) {
- ByteString taskBody = (ByteString)var3.next();
- //生成任务
- MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
- //将任务写入队列taskBlockingQueue
- this.taskBlockingQueue.submitRequest(startContainerRequest);
- }
-
- return this.machineOverload();
- }
map函数将任务写入队列,然后异步从队列taskBlockingQueue中取出消息。
ParallelTaskMater是类MapTaskMaster的实现类,定义如下:
- public class ParallelTaskMater extends MapTaskMaster {
- private static final Logger LOGGER = LogFactory.getLogger(ParallelTaskMater.class);
- private LogCollector logCollector = LogCollectorFactory.get();
- private static final Integer BATCH_SIZE = 256;
-
- public ParallelTaskMater(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
- super(jobInstanceInfo, actorContext);
- this.taskPersistence = new ServerTaskPersistence(jobInstanceInfo.getGroupId());
- long jobInstanceId = jobInstanceInfo.getJobInstanceId();
- this.taskStatusReqQueue = new ReqQueue(jobInstanceId, 1024);
- this.taskStatusReqBatchHandler = new TMStatusReqHandler(jobInstanceId, 1, 1, BATCH_SIZE * 2 * jobInstanceInfo.getAllWorkers().size(), this.taskStatusReqQueue);
- //存储待处理的任务
- this.taskBlockingQueue = new ReqQueue(jobInstanceId, BATCH_SIZE * 4);
- if (jobInstanceInfo.getXattrs() != null) {
- this.xAttrs = (MapTaskXAttrs)JsonUtil.fromJson(jobInstanceInfo.getXattrs(), MapTaskXAttrs.class);
- }
-
- if (this.xAttrs != null && this.xAttrs.getTaskDispatchMode().equals(TaskDispatchMode.PULL.getValue())) {
- //定时分发的处理函数
- this.taskDispatchReqHandler = new TaskPullReqHandler(jobInstanceId, 1, 2, BATCH_SIZE * jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue);
- } else {
- this.taskDispatchReqHandler = new TaskPushReqHandler(jobInstanceId, 1, 2, BATCH_SIZE * jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue, BATCH_SIZE);
- }
-
- }
- ......
- }
taskDispatchReqHandler 是一个处理任务分发的函数,在MapTaskMaster的启动函数startBatchHandler会启动taskDispatchReqHandler。
TaskPushReqHandler继承自TaskDispatchReqHandler,
TaskDispatchReqHandler继承自BaseReqHandler,BaseReqHandler的启动函数如下:
BaseReqHandler#start
- public void start() {
- ......
- this.batchRetrieveThread = new Thread(new Runnable() {
- public void run() {
- while(true) {
- try {
- if (!Thread.currentThread().isInterrupted()) {
- List
reqs = BaseReqHandler.this.asyncHandleReqs(); - BaseReqHandler.LOGGER.debug("jobInstanceId={}, batch retrieve reqs, size:{}, remain size:{}, batchSize:{}", new Object[]{BaseReqHandler.this.jobInstanceId, reqs.size(), BaseReqHandler.this.reqsQueue.size(), BaseReqHandler.this.batchSize});
- if ((float)reqs.size() >= (float)BaseReqHandler.this.batchSize * 0.8F) {
- continue;
- }
-
- if (reqs.isEmpty()) {
- Thread.sleep(BaseReqHandler.this.emptySleepMs);
- continue;
- }
-
- Thread.sleep(BaseReqHandler.this.defaultSleepMs);
- continue;
- }
- } catch (InterruptedException var2) {
- ;
- } catch (Throwable var3) {
- BaseReqHandler.LOGGER.error(var3);
- }
-
- return;
- }
- }
- }, this.batchRetrieveThreadName + this.jobInstanceId);
- this.batchRetrieveThread.start();
- }
定时执行函数asyncHandleReqs
- private synchronized List
asyncHandleReqs() { - List
reqs = this.reqsQueue.retrieveRequests(this.batchSize); - if (!reqs.isEmpty()) {
- this.activeRunnableNum.incrementAndGet();
- this.process(this.jobInstanceId, reqs);
- }
-
- return reqs;
- }
从队列中获取任务,然后调用函数process来处理。
TaskPushReqHandler#process
- public void process(long jobInstanceId, List
reqs, String workerAddr) { - this.batchProcessSvc.submit(new TaskPushReqHandler.BatchTasksDispatchRunnable(jobInstanceId, reqs));
- }
-
- private class BatchTasksDispatchRunnable implements Runnable {
- private long jobInstanceId;
- private List
reqs; -
- BatchTasksDispatchRunnable(long jobInstanceId, List
reqs) { - this.jobInstanceId = jobInstanceId;
- this.reqs = reqs;
- }
-
- public void run() {
- try {
- ((MapTaskMaster)TaskPushReqHandler.this.taskMasterPool.get(this.jobInstanceId)).batchDispatchTasks(this.reqs);
-
- } catch (Throwable var6) {
- TaskPushReqHandler.LOGGER.error(var6);
- } finally {
- TaskPushReqHandler.this.activeRunnableNum.decrementAndGet();
- }
-
- }
- }
调用函数batchDispatchTasks批量转发任务
MapTaskMaster#batchDispatchTasks
- public void batchDispatchTasks(List
masterStartContainerRequests, String remoteWorker) { - //分配正常的任务
- Map
> worker2ReqsWithNormal = Maps.newHashMap(); - //分配执行失败的任务
- Map
> worker2ReqsWithFailover = Maps.newHashMap(); - this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, remoteWorker);
- Iterator var5 = worker2ReqsWithNormal.entrySet().iterator();
-
- Entry entry;
- while(var5.hasNext()) {
- entry = (Entry)var5.next();
- //作业转发
- this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
- }
-
- var5 = worker2ReqsWithFailover.entrySet().iterator();
-
- while(var5.hasNext()) {
- entry = (Entry)var5.next();
- //作业转发
- this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
- }
-
- }
batchDispatchTasks将作业分为正常作业和失败的作业,然后调用函数batchHandleContainers进行分发。
- private void batchHandleContainers(final String workerIdAddr, final List
reqs, boolean isFailover, TaskDispatchMode dispatchMode) { - ......
-
- try {
- //将作业状态写入h2数据库
- this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
- //将作业推送给其它worker
- if (dispatchMode.equals(TaskDispatchMode.PUSH)) {
- final long startTime = System.currentTimeMillis();
- ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
- MasterBatchStartContainersRequest request = MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
- Timeout timeout = new Timeout(Duration.create(15L, TimeUnit.SECONDS));
- //发送给对应的worker
- Future
- future.onSuccess(new OnSuccess
- public void onSuccess(Object obj) throws Throwable {
- MasterBatchStartContainersResponse response = (MasterBatchStartContainersResponse)obj;
- if (response.getSuccess()) {
- ......
- } else {
-
- Iterator var3 = reqs.iterator();
-
- while(var3.hasNext()) {
- #更新数据库中task的状态为failed
- ContainerReportTaskStatusRequest faileStatusRequest = ContainerReportTaskStatusRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
- MapTaskMaster.this.updateTaskStatus(faileStatusRequest);
- }
- }
-
- }
- }, this.futureExecutor);
- future.onFailure(new OnFailure() {
- public void onFailure(Throwable e) throws Throwable {
- Iterator var3;
- MasterStartContainerRequest req;
- if (e instanceof TimeoutException) {
-
- List
taskIds = Lists.newArrayList(); - var3 = reqs.iterator();
-
- while(var3.hasNext()) {
- req = (MasterStartContainerRequest)var3.next();
- taskIds.add(req.getTaskId());
- }
-
- try {
- //task发送超时,更新状态为INIT
- int affectCnt = MapTaskMaster.this.taskPersistence.updateTaskStatus(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), taskIds, TaskStatus.INIT, workerId, workerAddr);
- ((WorkerProgressCounter)MapTaskMaster.this.workerProgressMap.get(workerAddr)).decrementRunning(affectCnt);
- } catch (Exception var6) {
- MapTaskMaster.LOGGER.error("jobInstanceId={}, timeout return init error", new Object[]{MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()});
- MapTaskMaster.this.updateNewInstanceStatus(MapTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, "timeout dispatch return init error");
- }
- } else {
- String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(MapTaskMaster.this.jobInstanceInfo.getJobId(), MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
- {MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size(), e});
- MapTaskMaster.this.logCollector.collect(uniqueIdWithoutTask, "map task dispatch fail.", e);
- var3 = reqs.iterator();
- //其它错误,task状态更新为failed
- while(var3.hasNext()) {
- req = (MasterStartContainerRequest)var3.next();
- ((TaskProgressCounter)MapTaskMaster.this.taskProgressMap.get(req.getTaskName())).incrementFailed();
- ((WorkerProgressCounter)MapTaskMaster.this.workerProgressMap.get(workerAddr)).incrementFailed();
- ContainerReportTaskStatusRequest faileReq = ContainerReportTaskStatusRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
- MapTaskMaster.this.updateTaskStatus(faileReq);
- }
- }
-
- }
- }, this.futureExecutor);
- }
- } catch (Throwable var13) {
- ......
- }
-
- }
在转发任务时,如果发送超时,会将任务的状态更新为INIT。MapTaskMaster会启动一个定时任务将数据库中状态为INIT的task拉取出来重新发送。
- protected void init() {
- ......
- #从数据库中拉取状态为INIT的任务
- List
taskInfos = MapTaskMaster.this.taskPersistence.pull(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), MapTaskMaster.this.pageSize); -
- if (taskInfos.isEmpty()) {
-
- Thread.sleep(10000L);
- } else {
- Iterator var4 = taskInfos.iterator();
-
- while(var4.hasNext()) {
- TaskInfo taskInfo = (TaskInfo)var4.next();
- String taskName = taskInfo.getTaskName();
- #将任务重新写入到队列taskBlockingQueue中
- Builder builder = MapTaskMaster.this.convert2StartContainerRequestBuilder(MapTaskMaster.this.jobInstanceInfo, taskInfo.getTaskId(), taskInfo.getTaskName(), taskBody, true);
- MapTaskMaster.this.taskBlockingQueue.submitRequest(builder.build());
- }
- }
- }