因为Nacos中有很多地方使用了这个TaskManager,所以我们得先了解一下这个类是干啥用的,方便后面阅读源码时候不会吃力;
先说结论:
TaskManager 可以看成是一个待执行的任务集合,用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理; 如果执行失败了,任务会被重新放入集合中等待下一次被消费;
AbstractTask是个抽象类,所有的需要被执行的任务都继续这个类; 这个类主要提供执行任务所需要的数据和方法;例如
/* 一个任务两次处理的间隔,单位是毫秒*/
private long taskInterval;
/*任务上次被处理的时间,用毫秒表示*/
private long lastProcessTime;
/* TaskManager 判断当前是否需要处理这个Task,子类可以Override这个函数实现自己的逻辑
*/
public boolean shouldProcess() {
return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);
}
TaskProcessor 是任务处理器接口,它有个方法
boolean process(String taskType, AbstractTask task);
用于执行对应的AbstractTask任务类; 不同的任务类型,可以实现自己的执行任务逻辑;
TaskManager 是个任务管理类;
它里面有两个属性保存了待消费的任务AbstractTask,和任务执行需要的TaskProcessor;
/**待消费的任务AbstractTask**/
private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>();
/**任务AbstractTask对应的任务执行器TaskProcessor**/
private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =new ConcurrentHashMap<String, TaskProcessor>();
如果taskProcessors中没有找到对应的任务执行器,那么它里面有一个默认执行器会执行
/**默认执行器**/
private TaskProcessor defaultTaskProcessor;
Nacos配置中心模块很重要一个功能就是,在初始化的时候以及每隔一段时间就会去数据库中把所有数据Dump到磁盘中;Dump就是一个任务类AbstractTask; 我们上面说过
AbstractTask就是一个信息承载对象,主要给TaskProcessor提供执行所需要的数据;我们看看DumpTask;
DumpTask定义了自己的一些属性; 再看看其他的例如DumpAllTask、DumpAllBetaTask
这两个任务类只定义了TASK_ID
既然有DumpTask任务类,那肯定就有对应的任务处理器类DumpProcessor;
DumpProcessor 是DumpTask任务的执行器;执行器中的方法
public boolean process(String taskType, AbstractTask task)
代码太长就不在这里分析了,它里面主要做的操作就是 保存配置文件到本地磁盘中,并缓存md5
详细可以看文章 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中
对应DumpAllTask、DumpAllBetaTask 任务的任务执行器有DumpAllProcessor、DumpAllBetaProcessor
上面是DumpAllTask的定义和DumpAllTaskProcessor执行器的定义;定义好了之后是怎么被触发的呢?
这个类就是专门Dump配置信息的服务类;上面提及的DumpAll就是在这里被调用的;我们来看下他主要方法;
@PostConstruct
public void init() {
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
/**在new这个TaskManager类的时候,专门执行任务的一个线程就已经开始启动了,这不过这个时候还没有任务Task添加进去**/
dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
Runnable dumpAll = new Runnable() {
@Override
public void run() {
dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
}
};
/**每10分钟执行一次DumpAll操作**/
TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
}
DumpService在初始化的时候回调用这个init方法;
1.先new了一个DumpAllProcessor执行器;
2.再new 了一个TaskManager任务管理器;在new这个任务管理器的时候,就会启动一个线程专门去执行所有待执行的任务;只不过这个时候还没有添加任务;
3.将这个任务管理器的默认执行器设置为DumpAllProcessor;
4.每十分钟执行一次往TaskManager中添加一个DumpAllTask的任务;一经添加就会被TaskManager中的线程 processingThread
执行process方法;