第一种方法,需要用async和await关键字将异步函数强制为同步状态,这样就可以同步获取每一个异步函数的结果了。(耗时长)
第二种就是使用Promise对象,将每个函数都装到Promise对象中,按照顺序放到.then方法中,变成串行执行,(耗时长)
第三种方法,需要定义全局变量,标记每个异步函数的执行结果状态,定义全局容器收集异步函数执行结果,定义全局检查函数检查执行结果状态,然后在每一个异步函数的完成的时候修改该状态,并且调用检查方法,在全部结束的时候表示异步函数结果收集完成。(代码复杂度高)
以上方式即可解决
于是乎,想到应该得有这么一个异步函数控制器,想串行就串行,想并行就并行,而且代码复杂度还得降低,网上搜索半天没有找到现成的代码,或者就是晦涩难懂。
思路如下:
每一个异步函数都看做成一个任务,当然同步函数亦可,这里借鉴批量程序的任务单元概念:
一个任务定义简单的生命周期,即任务初始化,任务执行中,任务结束。
这里先定义好枚举常量,增强代码易读性
- /**
- * 任务执行状态枚举类
- * @type {{INIT: number, RUNNING: number, FINISH: number}}
- */
- const TaskStatusEnum = {
- INIT: -1, //初始化
- RUNNING: 0, //执行中
- FINISH: 1 //已完成
- }
我们现在面向异步函数,如何知道异步函数这个任务什么时候结束?代码执行完最后一行就算结束?不对吧,代码执行完最后一行,异步函数似乎并没有结束。
而且js中也没办法去检测一个函数到底有没有结束,就算可以,那什么时机去检测呢?定时轮训可以不,异步的不确定性,定多少时间合适呢?1秒检测一次?还是1毫秒一次?检测频率越慢,这中间会造成延迟越大,检测越快,越影响代码执行效率,因为js是单线程。
我在刚学js的时候,会碰到这些疑问,原因是习惯性按照同步的思路去思考问题。
但js有个概念:事件驱动。比如:根据工作任务分配原则,A同事的工作完成后,我们要让A主动汇报自己已经完成工作了,而不是每次让我们一次又一次观察A的情况。A的汇报就是事件驱动,我们给A交代了一个事情(每次工作完要主动汇报)。
回归正题:我们需要让异步函数工作完成,主动通知自己工作已经完成。这里又是一个面向对象编程的思想,我们定义一个任务类:
类的实例化被看做任务初始化
类中提供一个开始执行的方法,控制任务的执行时机,修改自己的生命周期为正在执行
类中还提供一个结束的方法,用于任务执行过程中随时可主动修改自己的生命周期为已结束
代码如下:
- /**
- * 任务单元
- * @param taskId 任务id
- * @param task 任务函数
- * @param group 任务组对象
- * @constructor
- */
- function TaskUnit(taskId, task, group) {
- this.taskId = taskId; //任务唯一标识
- this.status = TaskStatusEnum.INIT; //执行状态:-1初始化 0执行中 1已完成
- this.task = task; //任务内容
- /**
- * 执行任务内容
- */
- this.execute = () => {
- this.status = TaskStatusEnum.RUNNING;
- this.task(this.end);
- }
- /**
- * 任务完成回调
- */
- this.end = () => {
- if (this.status === TaskStatusEnum.RUNNING) {
- this.status = TaskStatusEnum.FINISH
- group.check();
- }
- }
- }
额(⊙o⊙)…目测这个类,好像和Promise长得差不多。少了异常捕获catch和finally两个函数,这两个不是我任务单元所关心的,因此这些东西交给实际任务内容去处理。
多的是什么呢?任务并不是实例化之后被立刻调用,而是通过execute函数,让任务可在任意时机被调用,不受代码位置的限制。
这样的好处是什么?我可以在一瞬间添加无数个任务,但执行的时候我可以控制想让哪几个任务一起执行,就可以一起执行,不想让谁执行,谁就执行不了,想什么时机执行就什么时机执行。实现异步串行。
那可能有人会问,就凭这两段代码好像做不到异步串行吧。确实,这怎么解决?
有一个方法就是可以给任务加标识,即标识为1的都可以同时执行,标识为2的必须等所有的1结束了才能执行。标识为2的任务怎么知道标识1的任务什么时候结束呢?当然要让标识1的任务主动通知,设置检查函数,每一个标识1的任务结束后都主动调用检查函数,发现全部执行完毕后,就开始调用标识2的任务,这样就实现了异步串行了。
补:因为js是单线程,不会存在线程并发资源抢占的问题,一定有一个任务会检测到所有任务都结束了。在多线程编程的语言中,需要在检查的函数中加把锁。
加标识的方法存在局限性,哪里有那么多标识可加,代码得多混乱.....
因此我们又引入了任务组的概念:即可以同时一起执行的任务把它们归成一组,这样就会分成很多组,这个时候把任务组看成一个不可分割的整体,保证任务组的有序执行即可。如何保证有序,构建任务组队列,或者任务组链表,依次执行即可。
再分析任务组内的所有任务如何保证全部执行完毕:让每个任务结束都通知任务组检查,总会有一个任务检查的时候发现任务都结束了
代码如下:
- /**
- * 任务组
- * @constructor
- */
- function TaskGroup() {
- this.nextGroup=null;//下一个任务组
- this.taskList=[];//任务列表
- this.add=(task)=>{
- this.taskList.push(task);
- }
- this.setNextGroup=(group)=>{
- this.nextGroup=group;
- }
- /**
- * 启动任务组
- */
- this.start=()=>{
- for (let i = 0; i < this.taskList.length; i++) {
- const task=this.taskList[i];
- if (task.status === TaskStatusEnum.INIT) {
- task.execute();//执行
- }
- }
- }
- /**
- * 检查任务
- */
- this.check=()=>{
- for (let i = 0; i < this.taskList.length; i++) {
- if (this.taskList[i].status !== TaskStatusEnum.FINISH) {
- return; //发现还有任务没有执行完成
- }
- }
- if(this.nextGroup)this.nextGroup.start();//任务全部执行完成,进行下一个任务组
- }
-
- }
最后创建一个任务中心,用来提供对外接口,将任务封装成一个个任务,组成任务组,控制启动结束等等。
- /**
- * 任务执行状态枚举类
- * @type {{INIT: number, RUNNING: number, FINISH: number}}
- */
- const TaskStatusEnum = {
- INIT: -1, //初始化
- RUNNING: 0, //执行中
- FINISH: 1 //已完成
- }
-
- /**
- * 任务单元
- * @param taskId 任务id
- * @param task 任务函数
- * @param group 任务组对象
- * @constructor
- */
- function TaskUnit(taskId, task, group) {
- this.taskId = taskId; //任务唯一标识,暂时没用到
- this.status = TaskStatusEnum.INIT; //执行状态:-1初始化 0执行中 1已完成
- this.task = task; //任务内容
- /**
- * 执行任务内容
- */
- this.execute = () => {
- this.status = TaskStatusEnum.RUNNING;
- this.task(this.end);
- }
- /**
- * 任务完成回调
- */
- this.end = () => {
- if (this.status === TaskStatusEnum.RUNNING) {
- this.status = TaskStatusEnum.FINISH
- group.check();
- }
- }
- }
-
- /**
- * 任务组
- * @constructor
- */
- function TaskGroup() {
- this.nextGroup=null;//下一个任务组
- this.taskList=[];//任务列表
- this.add=(task)=>{
- this.taskList.push(task);
- }
- this.setNextGroup=(group)=>{
- this.nextGroup=group;
- }
- /**
- * 启动任务组
- */
- this.start=()=>{
- for (let i = 0; i < this.taskList.length; i++) {
- const task=this.taskList[i];
- if (task.status === TaskStatusEnum.INIT) {
- task.execute();//执行
- }
- }
- }
- /**
- * 检查任务
- */
- this.check=()=>{
- for (let i = 0; i < this.taskList.length; i++) {
- if (this.taskList[i].status !== TaskStatusEnum.FINISH) {
- return; //发现还有任务没有执行完成
- }
- }
- //任务全部执行完成,进行下一个任务组
- this.taskList=[] //清空当前任务组,让这些任务尽快回收
- if(this.nextGroup){
- let nextGroupTemp=this.nextGroup;
- nextGroupTemp.start();
- this.nextGroup=null;//取消引用,让当前任务组尽快回收
- }
- }
-
- }
-
- /**
- * 异步函数控制器,
- * .and() 添加异步并行函数
- * .next() 添加同步串行函数
- */
- module.exports = function () {
- this.queue = [];
- this.nowTaskGroup=null;//当前任务组
- this.startCount = 0;//加入任务数
-
- /**
- * 调用该函数表示添加并行任务
- * @param task 任务
- */
- this.and = (task) => {
- if(this.nowTaskGroup==null){
- this.nowTaskGroup=new TaskGroup();
- }
- this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup))
- return this
- }
- /**
- * 调用该函数表示添加串行任务
- * @param task 任务
- */
- this.next = (task) => {
- if(this.nowTaskGroup!=null) this.queue.push(this.nowTaskGroup);//防止上一个添加的任务是并行的
- this.nowTaskGroup=new TaskGroup();
- this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup));
- this.queue.push(this.nowTaskGroup);
- this.nowTaskGroup=null;//当前任务添加结束清空
- return this
- }
- /**
- * 调用该函数表示任务添加完毕,开始执行任务
- * @param endTask 任务全部结束后回调
- */
- this.finish = (endTask) => {
- if(this.nowTaskGroup!=null) this.queue.push(this.nowTaskGroup);
- this.nowTaskGroup=new TaskGroup();
- this.nowTaskGroup.add(new TaskUnit(++this.startCount, endTask, this.nowTaskGroup));
- this.queue.push(this.nowTaskGroup);
- this.nowTaskGroup=null;//当前任务添加结束清空
-
- //组装成单向链表
- for(let i=0;i<this.queue.length-1;i++){
- this.queue[i].setNextGroup(this.queue[i+1]);
- }
- this.queue[0].start();//启动链表首个任务组
- this.queue=[] //清空任务,为下一波任务做准备
- }
- }
- const Controller=require('./src/util/asyncController')
-
- const controller=new Controller();
- controller.and(end=>{
- setTimeout(()=>{
- console.log("并行1")
- end();
- },2000);
- }).and(end=>{
- setTimeout(()=>{
- console.log("并行2")
- end();
- },2000);
- }).and(end=>{
- setTimeout(()=>{
- console.log("并行3")
- end();
- },2000);
- }).next(end=>{
- setTimeout(()=>{
- console.log("串行1")
- end();
- },2000);
- }).next(end=>{
- setTimeout(()=>{
- console.log("串行2")
- end();
- },2000);
- }).and(end=>{
- setTimeout(()=>{
- console.log("并行4")
- end();
- },2000);
- }).and(end=>{
- setTimeout(()=>{
- console.log("并行5")
- end();
- },2000);
- }).next(end=>{
- setTimeout(()=>{
- console.log("串行3")
- end();
- },2000);
- }).finish(()=>{
- setTimeout(()=>{
- console.log("结束")
- },2000);
- })
执行结果
"C:\Program Files\nodejs\node.exe" main.js
并行1
并行2
并行3
串行1
串行2
并行4
并行5
串行3
结束Process finished with exit code 0
--------------------------------------------------
2024/01/06更新
代码格式优化
- /**
- * 任务执行状态枚举类
- * @type {{INIT: number, RUNNING: number, FINISH: number}}
- */
- const TaskStatusEnum = {
- INIT: -1, //初始化
- RUNNING: 0, //执行中
- FINISH: 1 //已完成
- }
-
- /**
- * 任务单元
- */
- class TaskUnit {
- /**
- * 任务唯一标识,暂时没用到
- * @type {String}
- */
- taskId; //
- /**
- * 任务状态
- * @type {Number}
- */
- status;
- /**
- * 任务执行函数
- * @type {Function}
- */
- task; //任务内容
- /**
- * 任务所在组
- * @type {TaskGroup}
- */
- group;
-
- /**
- * 任务单元
- * @param taskId 任务id
- * @param task 任务函数
- * @param group 任务组对象
- * @constructor
- */
- constructor(taskId, task, group) {
- this.taskId = taskId;
- this.status = TaskStatusEnum.INIT;
- this.task = task;
- this.group = group;
- }
-
- /**
- * 执行任务内容
- */
- execute = () => {
- this.status = TaskStatusEnum.RUNNING;
- return this.task(this.end);
- }
- /**
- * 任务完成回调
- */
- end = () => {
- if (this.status === TaskStatusEnum.RUNNING) {
- this.status = TaskStatusEnum.FINISH
- }
- return this.group.check();
- }
-
- }
-
-
- /**
- * 任务组
- * 任务组之间是串行执行的
- * 一个任务组内部的任务是并行执行的,直至所有任务都完成,任务组结束
- */
- class TaskGroup {
- /**
- * 下一个任务组
- * @type {TaskGroup}
- */
- nextGroup;
- /**
- * 当前任务组所有任务
- * @type {[TaskUnit]}
- */
- taskList = [];
-
- /**
- * 任务组完成后回调
- * @type {Function}
- */
- finish;
-
- /**
- *
- * @param {Function} finish
- */
- setFinishCallback(finish) {
- this.finish = finish;
- }
-
- add = (task) => {
- this.taskList.push(task);
- }
-
- setNextGroup = (group) => {
- this.nextGroup = group;
- }
- /**
- * 启动任务组
- */
- start = () => {
- this.taskList.forEach(e=>{
- if (e.status === TaskStatusEnum.INIT) {
- e.execute();//执行
- }
- })
- }
-
- /**
- * 检查任务
- */
- check() {
- /** 检查任务是否均执行完成 **/
- for (let i = 0; i < this.taskList.length; i++) {
- if (this.taskList[i].status !== TaskStatusEnum.FINISH) {
- return; //发现还有任务没有执行完成
- }
- }
- this.taskList = null;
- /** 务全部执行完成,进行下一个任务组,没有下一个任务组,就执行完成的回调函数 **/
- if (this.nextGroup) {
- let nextGroupTemp = this.nextGroup;
- this.nextGroup = null;
- return nextGroupTemp.start();
- } else {
- if (this.finish){
- return this.finish(); //所有任务组都结束了
- }
- }
- }
- }
-
- /**
- * 同步控制器
- */
- class Asyncor {
- /**
- * 任务组队列
- * @type {[TaskGroup]}
- */
- queue = [];
- /**
- * 当时任务组
- * @type {TaskGroup}
- */
- nowTaskGroup;
- /**
- * 加入任务数
- * @type {number}
- */
- startCount = 0;
-
- /**
- * 向当前任务组添加并行任务,没有任务组则创建一个
- * @param task 任务
- */
- add = (task) => {
- if (this.nowTaskGroup == null) {
- this.nowTaskGroup = new TaskGroup();
- }
- if (task) {
- /** 添加任务 */
- this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup))
- }
- return this
- }
- /**
- * 本着上一步必须完成之后才能进行下一步的原则,当前函数为新的下一步。
- * 应将之前添加的任务与当前任务分离开,创建一个新的任务组,添加当前步骤的任务
- * 可通过add函数在当前步骤中继续追加并行任务
- * 或者继续调用该next进行添加下下一步
- * @param task 任务
- */
- next = (task) => {
- /** 看看有没有上一步任务,有就分开 */
- if (this.nowTaskGroup != null) {
- this.queue.push(this.nowTaskGroup);
- }
- /** 每次调用该函数都会创建一个单独的任务组,与上一步任务隔离开 */
- this.nowTaskGroup = new TaskGroup();
- if (task) {
- /** 添加任务 */
- this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup));
- }
- return this
- }
- /**
- * 调用该函数表示任务添加完毕,开始执行任务
- * @param {Function}finish 任务全部结束后回调
- */
- start = (finish) => {
- /** 将启动前最后的任务组打包 **/
- if (this.nowTaskGroup != null) {
- this.queue.push(this.nowTaskGroup);
- }
- this.nowTaskGroup = null;
-
- /** 所有任务组组装成单向链表 */
- for (let i = 0; i < this.queue.length - 1; i++) {
- this.queue[i].setNextGroup(this.queue[i + 1]);
- }
-
- /** 给最后一个任务组 设置全部完成的回调函数 **/
- this.queue[this.queue.length - 1].setFinishCallback(finish);
-
- /** 启动链表首个任务组 */
- const firstGroup = this.queue[0];
- /** 这里清空任务队列,可以使当前Asyncor对象可以复用 */
- this.queue = []
- return firstGroup.start();//启动链表首个任务组
- }
-
- }
-
- module.exports = Asyncor
-
提供三个函数
asyncor.add() asyncor.next() asyncor.start()
- asyncor.add(function (end) {
- setTimeout(()=>{
- //自己的异步代码
- end();//通知控制器当前工作已完成
- })
- })
-
- asyncor.next(function (end) {
- setTimeout(()=>{
- //自己的异步代码
- end();//通知控制器当前工作已完成
- })
- })
-
- asyncor.start(function () {
- //整个任务都完成之后会调用这里,可以在这里自定义功能
- });
支持链式调用
asyncor.add().add().next().next().add().next().add().start()
如何使用?如图表示任务计划执行情况,每一个单元格表示一个任务,横向的表示并发执行,纵向表示任务批次,批次需要串行执行。
第1批次 | 1 | 2 | 3 |
第2批次 | 4 | 5 | |
第3批次 | 6 | ||
第4批次 | 7 | ||
第5批次 | 8 | 9 | 10 |
对应代码示例
- //先创建对象
- const asyncor=new Asyncor();
-
-
-
- /** 第一批次 */
- asyncor.add(end=>{
- //任务1
- console.log("任务1")
-
- end();
- }).add(end=>{
- //任务2
- console.log("任务2")
-
- end();
- }).add(end=>{
- //任务3
- console.log("任务3")
-
- end();
- });
-
-
- /** 第二批次 */
- asyncor.next(end=>{
- //任务4
- console.log("任务4")
-
- end();
- }).add(end=>{
- //任务5
- console.log("任务5")
-
- end();
- });
-
-
-
- /** 第三批次 */
- asyncor.next(end=>{
- //任务6
- console.log("任务6")
-
- end();
- });
-
-
-
- /** 第四批次 */
- asyncor.next(end=>{
- //任务7
- console.log("任务7")
-
- end();
- });
-
-
-
- /** 第五批次 */
- asyncor.next(end=>{
- //任务8
- console.log("任务8")
-
- end();
- }).add(end=>{
- //任务9
- console.log("任务9")
-
- end();
- }).add(end=>{
- //任务10
- console.log("任务10")
-
- end();
- });
-
-
-
- /** 启动 */
- asyncor.start(()=>{
- console.log("执行完成");
- });
-
-
-
- 输出:
- 任务1
- 任务2
- 任务3
- 任务4
- 任务5
- 任务6
- 任务7
- 任务8
- 任务9
- 任务10
- 执行完成
-