Schedulerx2.0是阿里巴巴开发的一个基于akka的分布式任务调度框架,提供分布式执行、多种任务类型、统一日志等功能,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。本文主要讲解schedulerx-worker的工作原理
Schedulerx2.0是中心化的调度框架,包括Server和Worker。Server负责任务的触发和调度,通过分发引擎提交任务给Worker。Worker负责任务的执行。
Worker分为TaskMaster, Container, Processor三层:
分为TaskMaster, Container, Processor三层:
Server触发任务调度执行,提交任务给Worker,Worker通过JobInstanceActor接收Server提交的任务:
- public class JobInstanceActor extends UntypedActor {
- private TaskMasterPool masterPool;
- private LogCollector logCollector;
- private static final Logger LOGGER = LogFactory.getLogger(JobInstanceActor.class);
-
- public JobInstanceActor() {
- this.masterPool = TaskMasterPool.INSTANCE;
- this.logCollector = LogCollectorFactory.get();
- }
-
- public void onReceive(Object obj) throws Throwable {
- if (obj instanceof ServerSubmitJobInstanceRequest) {
- this.handleSubmitJobInstance((ServerSubmitJobInstanceRequest)obj);
- } else if (obj instanceof ServerKillJobInstanceRequest) {
- this.handleKillJobInstance((ServerKillJobInstanceRequest)obj);
- } else if (obj instanceof ServerRetryTasksRequest) {
- this.handleRetryTasks((ServerRetryTasksRequest)obj);
- } else if (obj instanceof ServerKillTaskRequest) {
- this.handleKillTask((ServerKillTaskRequest)obj);
- } else if (obj instanceof ServerCheckTaskMasterRequest) {
- this.handCheckTaskMaster((ServerCheckTaskMasterRequest)obj);
- } else if (obj instanceof MasterNotifyWorkerPullRequest) {
- this.handleInitPull((MasterNotifyWorkerPullRequest)obj);
- } else if (obj instanceof ServerThreadDumpRequest) {
- this.handleThreadDump((ServerThreadDumpRequest)obj);
- } else if (obj instanceof ServerPushLogConfigRequest) {
- this.handlePushLogConfig((ServerPushLogConfigRequest)obj);
- }
-
- }
其中,任务执行的消息类是ServerSubmitJobInstanceRequest,Worker在接收到消息时,会交给函数handleSubmitJobInstance来处理:
- private void handleSubmitJobInstance(ServerSubmitJobInstanceRequest request) {
-
- ServerSubmitJobInstanceResponse response = null;
- //任务正在执行中,直接返回
- if (this.masterPool.contains(request.getJobInstanceId())) {
-
- this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), ClientLoggerMessage.appendMessage("server trigger client fail.", new String[]{errMsg}));
- response = ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage(errMsg).build();
- this.getSender().tell(response, this.getSelf());
- } else {
- response = ServerSubmitJobInstanceResponse.newBuilder().setSuccess(true).build();
- this.getSender().tell(response, this.getSelf());
-
- try {
- JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
- //根据任务的类型,创建TaskMaster对象
- TaskMaster taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
-
- this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
- //提交任务执行
- taskMaster.submitInstance(jobInstanceInfo);
-
- ......
- } catch (Throwable var5) {
- ......
- }
- }
-
- }
handleSubmitJobInstance中会创建一个TaskMaster对象来执行任务,TaskMaster是一个抽象类,它有几个继承类,对应几个分布式任务的编程模型,本文以Map类型的任务为例子,说明任务执行流程。
map模型作业提供了并行计算、内存网格、网格计算三种执行方式:
MapTaskMaster作为TaskMaster的继承类,定义如下:
- public abstract class MapTaskMaster extends TaskMaster {
-
- protected volatile int pageSize = ConfigUtil.getWorkerConfig().getInt("map.master.page.size", 100);
- protected volatile int queueSize = ConfigUtil.getWorkerConfig().getInt("map.master.queue.size", 10000);
- private volatile int dispatcherSize = ConfigUtil.getWorkerConfig().getInt("map.master.dispatcher.size", 5);
- protected ReqQueue
taskStatusReqQueue; - protected TMStatusReqHandler
taskStatusReqBatchHandler; - //存放map生成的任务
- protected ReqQueue
taskBlockingQueue; - //存放即将转发的任务,并将任务转发给各个worker执行
- protected TaskDispatchReqHandler
taskDispatchReqHandler; - private volatile String rootTaskResult;
- protected TaskPersistence taskPersistence;
- protected Map
taskProgressMap = Maps.newConcurrentMap(); - protected Map
workerProgressMap = Maps.newConcurrentMap(); - private Map
taskResultMap = Maps.newHashMap(); - private Map
taskStatusMap = Maps.newHashMap(); - ......
- }
MapTaskMaster的submitInstance函数如下:
- public void submitInstance(JobInstanceInfo jobInstanceInfo) throws Exception {
- try {
- ......
- //各个对象的初始化,启动线程池中的任务
- this.startBatchHandler();
- this.createRootTask();
- this.init();
- } catch (Throwable var4) {
- ......
- }
-
- }
1、startBatchHandler主要是做一些初始化,启动线程池中的任务定时执行。
2、createRootTask主要是创建一个根任务,即由这个任务来map出来多个子任务。
3、init主要是启动线程池中的任务定时执行。
createRootTask的代码如下:
- protected void createRootTask() throws Exception {
- String taskName = "MAP_TASK_ROOT";
- ByteString taskBody = ByteString.copyFrom(HessianUtil.toBytes("MAP_TASK_ROOT"));
- //初始化任务计数器,当前job有一个task正在执行
- this.initTaskProgress(taskName, 1);
- //参数转换为MasterStartContainerRequest
- MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
- //将root任务分发给本地Worker执行
- this.batchDispatchTasks(Lists.newArrayList(new MasterStartContainerRequest[]{startContainerRequest}), this.getLocalWorkerIdAddr());
- }
函数batchDispatchTasks将任务分配给各个Worker执行,将root类型的任务分配给本地的Worker执行:
- public void batchDispatchTasks(List
masterStartContainerRequests, String remoteWorker) { - Map
> worker2ReqsWithNormal = Maps.newHashMap(); - Map
> worker2ReqsWithFailover = Maps.newHashMap(); - this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, remoteWorker);
- Iterator var5 = worker2ReqsWithNormal.entrySet().iterator();
-
- Entry entry;
- while(var5.hasNext()) {
- entry = (Entry)var5.next();
- this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
- }
-
- var5 = worker2ReqsWithFailover.entrySet().iterator();
-
- while(var5.hasNext()) {
- entry = (Entry)var5.next();
- this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
- }
-
- }
函数batchHandleContainers将任务写入本地数据库H2,同时将任务转发给对应的worker。
- private void batchHandleContainers(final String workerIdAddr, final List
reqs, boolean isFailover, TaskDispatchMode dispatchMode) { - ......
-
- try {
- //将task写入本地数据库H2,task的状态为running
- this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
- if (dispatchMode.equals(TaskDispatchMode.PUSH)) {
- //将task发送给对应的worker
- final long startTime = System.currentTimeMillis();
- ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
- MasterBatchStartContainersRequest request = MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
- Timeout timeout = new Timeout(Duration.create(15L, TimeUnit.SECONDS));
- Future
-
- } catch (Throwable var13) {
- ......
- }
-
- }
Container是执行业务逻辑的容器框架,TaskMaster转发给worker的任务会Containe模块执行。其中ContainerRoutingActor是一个路由Actor,里面包含多个ContainerActor,ContainerRoutingActor将接收到的消息转发给其中的一个ContainerActor。ContainerActor的定义如下:
- public class ContainerActor extends UntypedActor {
- public void onReceive(Object obj) throws Throwable {
- if (obj instanceof MasterStartContainerRequest) {
- this.handleStartContainer((MasterStartContainerRequest)obj);
- } else if (obj instanceof MasterBatchStartContainersRequest) {
- this.handleBatchStartContainers((MasterBatchStartContainersRequest)obj);
- } else if (obj instanceof MasterKillContainerRequest) {
- this.handleKillContainer((MasterKillContainerRequest)obj);
- } else if (obj instanceof MasterDestroyContainerPoolRequest) {
- this.handleDestroyContainerPool((MasterDestroyContainerPoolRequest)obj);
- }
-
- }
- }
当接收到MasterBatchStartContainersRequest类型的消息时,会调用函数startContainer来执行
- private String startContainer(MasterStartContainerRequest request) throws Exception {
- String uniqueId = IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId());
-
- JobContext context = ContanerUtil.convert2JobContext(request);
- Container container = ContainerFactory.create(context);
- if (container != null) {
- this.containerPool.submit(context.getJobId(), context.getJobInstanceId(), context.getTaskId(), container, consumerNum);
-
- }
- ......
- }
最后通过submit提交任务、执行任务。