• js异步控制器,实现异步同步代码交替穿插执行


    一、js代码中发起请求,读取文件等操作都是异步执行,如果我们希望将多个异步函数的结果都收集起来,统一处理,这应该怎么做呢?

    第一种方法,需要用async和await关键字将异步函数强制为同步状态,这样就可以同步获取每一个异步函数的结果了。(耗时长)

    第二种就是使用Promise对象,将每个函数都装到Promise对象中,按照顺序放到.then方法中,变成串行执行,(耗时长)

    第三种方法,需要定义全局变量,标记每个异步函数的执行结果状态,定义全局容器收集异步函数执行结果,定义全局检查函数检查执行结果状态,然后在每一个异步函数的完成的时候修改该状态,并且调用检查方法,在全部结束的时候表示异步函数结果收集完成。(代码复杂度高)

    以上方式即可解决

    二、如果希望多个异步并行,全部结束之后又需要连续几个异步串行,之后又希望多个异步函数并行,又希望多个异步串行等等,这样的话,通过上面的方法,也可以解决,但代码都会变得特别复杂,难以维护,怎么办?

    于是乎,想到应该得有这么一个异步函数控制器,想串行就串行,想并行就并行,而且代码复杂度还得降低,网上搜索半天没有找到现成的代码,或者就是晦涩难懂。

    思路如下:

    每一个异步函数都看做成一个任务,当然同步函数亦可,这里借鉴批量程序的任务单元概念:

    一个任务定义简单的生命周期,即任务初始化,任务执行中,任务结束。

    这里先定义好枚举常量,增强代码易读性

    1. /**
    2. * 任务执行状态枚举类
    3. * @type {{INIT: number, RUNNING: number, FINISH: number}}
    4. */
    5. const TaskStatusEnum = {
    6. INIT: -1, //初始化
    7. RUNNING: 0, //执行中
    8. FINISH: 1 //已完成
    9. }

    我们现在面向异步函数,如何知道异步函数这个任务什么时候结束?代码执行完最后一行就算结束?不对吧,代码执行完最后一行,异步函数似乎并没有结束。

    而且js中也没办法去检测一个函数到底有没有结束,就算可以,那什么时机去检测呢?定时轮训可以不,异步的不确定性,定多少时间合适呢?1秒检测一次?还是1毫秒一次?检测频率越慢,这中间会造成延迟越大,检测越快,越影响代码执行效率,因为js是单线程。

    我在刚学js的时候,会碰到这些疑问,原因是习惯性按照同步的思路去思考问题。

    但js有个概念:事件驱动。比如:根据工作任务分配原则,A同事的工作完成后,我们要让A主动汇报自己已经完成工作了,而不是每次让我们一次又一次观察A的情况。A的汇报就是事件驱动,我们给A交代了一个事情(每次工作完要主动汇报)。

    回归正题:我们需要让异步函数工作完成,主动通知自己工作已经完成。这里又是一个面向对象编程的思想,我们定义一个任务类:

    类的实例化被看做任务初始化

    类中提供一个开始执行的方法,控制任务的执行时机,修改自己的生命周期为正在执行

    类中还提供一个结束的方法,用于任务执行过程中随时可主动修改自己的生命周期为已结束

    代码如下:

    1. /**
    2. * 任务单元
    3. * @param taskId 任务id
    4. * @param task 任务函数
    5. * @param group 任务组对象
    6. * @constructor
    7. */
    8. function TaskUnit(taskId, task, group) {
    9. this.taskId = taskId; //任务唯一标识
    10. this.status = TaskStatusEnum.INIT; //执行状态:-1初始化 0执行中 1已完成
    11. this.task = task; //任务内容
    12. /**
    13. * 执行任务内容
    14. */
    15. this.execute = () => {
    16. this.status = TaskStatusEnum.RUNNING;
    17. this.task(this.end);
    18. }
    19. /**
    20. * 任务完成回调
    21. */
    22. this.end = () => {
    23. if (this.status === TaskStatusEnum.RUNNING) {
    24. this.status = TaskStatusEnum.FINISH
    25. group.check();
    26. }
    27. }
    28. }

    额(⊙o⊙)…目测这个类,好像和Promise长得差不多。少了异常捕获catch和finally两个函数,这两个不是我任务单元所关心的,因此这些东西交给实际任务内容去处理。

    多的是什么呢?任务并不是实例化之后被立刻调用,而是通过execute函数,让任务可在任意时机被调用,不受代码位置的限制。

    这样的好处是什么?我可以在一瞬间添加无数个任务,但执行的时候我可以控制想让哪几个任务一起执行,就可以一起执行,不想让谁执行,谁就执行不了,想什么时机执行就什么时机执行。实现异步串行。

    那可能有人会问,就凭这两段代码好像做不到异步串行吧。确实,这怎么解决?

    有一个方法就是可以给任务加标识,即标识为1的都可以同时执行,标识为2的必须等所有的1结束了才能执行。标识为2的任务怎么知道标识1的任务什么时候结束呢?当然要让标识1的任务主动通知,设置检查函数,每一个标识1的任务结束后都主动调用检查函数,发现全部执行完毕后,就开始调用标识2的任务,这样就实现了异步串行了。

    补:因为js是单线程,不会存在线程并发资源抢占的问题,一定有一个任务会检测到所有任务都结束了。在多线程编程的语言中,需要在检查的函数中加把锁。

    加标识的方法存在局限性,哪里有那么多标识可加,代码得多混乱.....

    因此我们又引入了任务组的概念:即可以同时一起执行的任务把它们归成一组,这样就会分成很多组,这个时候把任务组看成一个不可分割的整体,保证任务组的有序执行即可。如何保证有序,构建任务组队列,或者任务组链表,依次执行即可。

    再分析任务组内的所有任务如何保证全部执行完毕:让每个任务结束都通知任务组检查,总会有一个任务检查的时候发现任务都结束了

    代码如下:

    1. /**
    2. * 任务组
    3. * @constructor
    4. */
    5. function TaskGroup() {
    6. this.nextGroup=null;//下一个任务组
    7. this.taskList=[];//任务列表
    8. this.add=(task)=>{
    9. this.taskList.push(task);
    10. }
    11. this.setNextGroup=(group)=>{
    12. this.nextGroup=group;
    13. }
    14. /**
    15. * 启动任务组
    16. */
    17. this.start=()=>{
    18. for (let i = 0; i < this.taskList.length; i++) {
    19. const task=this.taskList[i];
    20. if (task.status === TaskStatusEnum.INIT) {
    21. task.execute();//执行
    22. }
    23. }
    24. }
    25. /**
    26. * 检查任务
    27. */
    28. this.check=()=>{
    29. for (let i = 0; i < this.taskList.length; i++) {
    30. if (this.taskList[i].status !== TaskStatusEnum.FINISH) {
    31. return; //发现还有任务没有执行完成
    32. }
    33. }
    34. if(this.nextGroup)this.nextGroup.start();//任务全部执行完成,进行下一个任务组
    35. }
    36. }

    最后创建一个任务中心,用来提供对外接口,将任务封装成一个个任务,组成任务组,控制启动结束等等。

    三、总代码如下:asyncController.js

    1. /**
    2. * 任务执行状态枚举类
    3. * @type {{INIT: number, RUNNING: number, FINISH: number}}
    4. */
    5. const TaskStatusEnum = {
    6. INIT: -1, //初始化
    7. RUNNING: 0, //执行中
    8. FINISH: 1 //已完成
    9. }
    10. /**
    11. * 任务单元
    12. * @param taskId 任务id
    13. * @param task 任务函数
    14. * @param group 任务组对象
    15. * @constructor
    16. */
    17. function TaskUnit(taskId, task, group) {
    18. this.taskId = taskId; //任务唯一标识,暂时没用到
    19. this.status = TaskStatusEnum.INIT; //执行状态:-1初始化 0执行中 1已完成
    20. this.task = task; //任务内容
    21. /**
    22. * 执行任务内容
    23. */
    24. this.execute = () => {
    25. this.status = TaskStatusEnum.RUNNING;
    26. this.task(this.end);
    27. }
    28. /**
    29. * 任务完成回调
    30. */
    31. this.end = () => {
    32. if (this.status === TaskStatusEnum.RUNNING) {
    33. this.status = TaskStatusEnum.FINISH
    34. group.check();
    35. }
    36. }
    37. }
    38. /**
    39. * 任务组
    40. * @constructor
    41. */
    42. function TaskGroup() {
    43. this.nextGroup=null;//下一个任务组
    44. this.taskList=[];//任务列表
    45. this.add=(task)=>{
    46. this.taskList.push(task);
    47. }
    48. this.setNextGroup=(group)=>{
    49. this.nextGroup=group;
    50. }
    51. /**
    52. * 启动任务组
    53. */
    54. this.start=()=>{
    55. for (let i = 0; i < this.taskList.length; i++) {
    56. const task=this.taskList[i];
    57. if (task.status === TaskStatusEnum.INIT) {
    58. task.execute();//执行
    59. }
    60. }
    61. }
    62. /**
    63. * 检查任务
    64. */
    65. this.check=()=>{
    66. for (let i = 0; i < this.taskList.length; i++) {
    67. if (this.taskList[i].status !== TaskStatusEnum.FINISH) {
    68. return; //发现还有任务没有执行完成
    69. }
    70. }
    71. //任务全部执行完成,进行下一个任务组
    72. this.taskList=[] //清空当前任务组,让这些任务尽快回收
    73. if(this.nextGroup){
    74. let nextGroupTemp=this.nextGroup;
    75. nextGroupTemp.start();
    76. this.nextGroup=null;//取消引用,让当前任务组尽快回收
    77. }
    78. }
    79. }
    80. /**
    81. * 异步函数控制器,
    82. * .and() 添加异步并行函数
    83. * .next() 添加同步串行函数
    84. */
    85. module.exports = function () {
    86. this.queue = [];
    87. this.nowTaskGroup=null;//当前任务组
    88. this.startCount = 0;//加入任务数
    89. /**
    90. * 调用该函数表示添加并行任务
    91. * @param task 任务
    92. */
    93. this.and = (task) => {
    94. if(this.nowTaskGroup==null){
    95. this.nowTaskGroup=new TaskGroup();
    96. }
    97. this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup))
    98. return this
    99. }
    100. /**
    101. * 调用该函数表示添加串行任务
    102. * @param task 任务
    103. */
    104. this.next = (task) => {
    105. if(this.nowTaskGroup!=null) this.queue.push(this.nowTaskGroup);//防止上一个添加的任务是并行的
    106. this.nowTaskGroup=new TaskGroup();
    107. this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup));
    108. this.queue.push(this.nowTaskGroup);
    109. this.nowTaskGroup=null;//当前任务添加结束清空
    110. return this
    111. }
    112. /**
    113. * 调用该函数表示任务添加完毕,开始执行任务
    114. * @param endTask 任务全部结束后回调
    115. */
    116. this.finish = (endTask) => {
    117. if(this.nowTaskGroup!=null) this.queue.push(this.nowTaskGroup);
    118. this.nowTaskGroup=new TaskGroup();
    119. this.nowTaskGroup.add(new TaskUnit(++this.startCount, endTask, this.nowTaskGroup));
    120. this.queue.push(this.nowTaskGroup);
    121. this.nowTaskGroup=null;//当前任务添加结束清空
    122. //组装成单向链表
    123. for(let i=0;i<this.queue.length-1;i++){
    124. this.queue[i].setNextGroup(this.queue[i+1]);
    125. }
    126. this.queue[0].start();//启动链表首个任务组
    127. this.queue=[] //清空任务,为下一波任务做准备
    128. }
    129. }

    四、使用样例:

    1. const Controller=require('./src/util/asyncController')
    2. const controller=new Controller();
    3. controller.and(end=>{
    4. setTimeout(()=>{
    5. console.log("并行1")
    6. end();
    7. },2000);
    8. }).and(end=>{
    9. setTimeout(()=>{
    10. console.log("并行2")
    11. end();
    12. },2000);
    13. }).and(end=>{
    14. setTimeout(()=>{
    15. console.log("并行3")
    16. end();
    17. },2000);
    18. }).next(end=>{
    19. setTimeout(()=>{
    20. console.log("串行1")
    21. end();
    22. },2000);
    23. }).next(end=>{
    24. setTimeout(()=>{
    25. console.log("串行2")
    26. end();
    27. },2000);
    28. }).and(end=>{
    29. setTimeout(()=>{
    30. console.log("并行4")
    31. end();
    32. },2000);
    33. }).and(end=>{
    34. setTimeout(()=>{
    35. console.log("并行5")
    36. end();
    37. },2000);
    38. }).next(end=>{
    39. setTimeout(()=>{
    40. console.log("串行3")
    41. end();
    42. },2000);
    43. }).finish(()=>{
    44. setTimeout(()=>{
    45. console.log("结束")
    46. },2000);
    47. })

     执行结果

    "C:\Program Files\nodejs\node.exe" main.js
    并行1
    并行2
    并行3
    串行1
    串行2
    并行4
    并行5
    串行3
    结束

    Process finished with exit code 0

    --------------------------------------------------

    2024/01/06更新

    代码格式优化

    1. /**
    2. * 任务执行状态枚举类
    3. * @type {{INIT: number, RUNNING: number, FINISH: number}}
    4. */
    5. const TaskStatusEnum = {
    6. INIT: -1, //初始化
    7. RUNNING: 0, //执行中
    8. FINISH: 1 //已完成
    9. }
    10. /**
    11. * 任务单元
    12. */
    13. class TaskUnit {
    14. /**
    15. * 任务唯一标识,暂时没用到
    16. * @type {String}
    17. */
    18. taskId; //
    19. /**
    20. * 任务状态
    21. * @type {Number}
    22. */
    23. status;
    24. /**
    25. * 任务执行函数
    26. * @type {Function}
    27. */
    28. task; //任务内容
    29. /**
    30. * 任务所在组
    31. * @type {TaskGroup}
    32. */
    33. group;
    34. /**
    35. * 任务单元
    36. * @param taskId 任务id
    37. * @param task 任务函数
    38. * @param group 任务组对象
    39. * @constructor
    40. */
    41. constructor(taskId, task, group) {
    42. this.taskId = taskId;
    43. this.status = TaskStatusEnum.INIT;
    44. this.task = task;
    45. this.group = group;
    46. }
    47. /**
    48. * 执行任务内容
    49. */
    50. execute = () => {
    51. this.status = TaskStatusEnum.RUNNING;
    52. return this.task(this.end);
    53. }
    54. /**
    55. * 任务完成回调
    56. */
    57. end = () => {
    58. if (this.status === TaskStatusEnum.RUNNING) {
    59. this.status = TaskStatusEnum.FINISH
    60. }
    61. return this.group.check();
    62. }
    63. }
    64. /**
    65. * 任务组
    66. * 任务组之间是串行执行的
    67. * 一个任务组内部的任务是并行执行的,直至所有任务都完成,任务组结束
    68. */
    69. class TaskGroup {
    70. /**
    71. * 下一个任务组
    72. * @type {TaskGroup}
    73. */
    74. nextGroup;
    75. /**
    76. * 当前任务组所有任务
    77. * @type {[TaskUnit]}
    78. */
    79. taskList = [];
    80. /**
    81. * 任务组完成后回调
    82. * @type {Function}
    83. */
    84. finish;
    85. /**
    86. *
    87. * @param {Function} finish
    88. */
    89. setFinishCallback(finish) {
    90. this.finish = finish;
    91. }
    92. add = (task) => {
    93. this.taskList.push(task);
    94. }
    95. setNextGroup = (group) => {
    96. this.nextGroup = group;
    97. }
    98. /**
    99. * 启动任务组
    100. */
    101. start = () => {
    102. this.taskList.forEach(e=>{
    103. if (e.status === TaskStatusEnum.INIT) {
    104. e.execute();//执行
    105. }
    106. })
    107. }
    108. /**
    109. * 检查任务
    110. */
    111. check() {
    112. /** 检查任务是否均执行完成 **/
    113. for (let i = 0; i < this.taskList.length; i++) {
    114. if (this.taskList[i].status !== TaskStatusEnum.FINISH) {
    115. return; //发现还有任务没有执行完成
    116. }
    117. }
    118. this.taskList = null;
    119. /** 务全部执行完成,进行下一个任务组,没有下一个任务组,就执行完成的回调函数 **/
    120. if (this.nextGroup) {
    121. let nextGroupTemp = this.nextGroup;
    122. this.nextGroup = null;
    123. return nextGroupTemp.start();
    124. } else {
    125. if (this.finish){
    126. return this.finish(); //所有任务组都结束了
    127. }
    128. }
    129. }
    130. }
    131. /**
    132. * 同步控制器
    133. */
    134. class Asyncor {
    135. /**
    136. * 任务组队列
    137. * @type {[TaskGroup]}
    138. */
    139. queue = [];
    140. /**
    141. * 当时任务组
    142. * @type {TaskGroup}
    143. */
    144. nowTaskGroup;
    145. /**
    146. * 加入任务数
    147. * @type {number}
    148. */
    149. startCount = 0;
    150. /**
    151. * 向当前任务组添加并行任务,没有任务组则创建一个
    152. * @param task 任务
    153. */
    154. add = (task) => {
    155. if (this.nowTaskGroup == null) {
    156. this.nowTaskGroup = new TaskGroup();
    157. }
    158. if (task) {
    159. /** 添加任务 */
    160. this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup))
    161. }
    162. return this
    163. }
    164. /**
    165. * 本着上一步必须完成之后才能进行下一步的原则,当前函数为新的下一步。
    166. * 应将之前添加的任务与当前任务分离开,创建一个新的任务组,添加当前步骤的任务
    167. * 可通过add函数在当前步骤中继续追加并行任务
    168. * 或者继续调用该next进行添加下下一步
    169. * @param task 任务
    170. */
    171. next = (task) => {
    172. /** 看看有没有上一步任务,有就分开 */
    173. if (this.nowTaskGroup != null) {
    174. this.queue.push(this.nowTaskGroup);
    175. }
    176. /** 每次调用该函数都会创建一个单独的任务组,与上一步任务隔离开 */
    177. this.nowTaskGroup = new TaskGroup();
    178. if (task) {
    179. /** 添加任务 */
    180. this.nowTaskGroup.add(new TaskUnit(++this.startCount, task, this.nowTaskGroup));
    181. }
    182. return this
    183. }
    184. /**
    185. * 调用该函数表示任务添加完毕,开始执行任务
    186. * @param {Function}finish 任务全部结束后回调
    187. */
    188. start = (finish) => {
    189. /** 将启动前最后的任务组打包 **/
    190. if (this.nowTaskGroup != null) {
    191. this.queue.push(this.nowTaskGroup);
    192. }
    193. this.nowTaskGroup = null;
    194. /** 所有任务组组装成单向链表 */
    195. for (let i = 0; i < this.queue.length - 1; i++) {
    196. this.queue[i].setNextGroup(this.queue[i + 1]);
    197. }
    198. /** 给最后一个任务组 设置全部完成的回调函数 **/
    199. this.queue[this.queue.length - 1].setFinishCallback(finish);
    200. /** 启动链表首个任务组 */
    201. const firstGroup = this.queue[0];
    202. /** 这里清空任务队列,可以使当前Asyncor对象可以复用 */
    203. this.queue = []
    204. return firstGroup.start();//启动链表首个任务组
    205. }
    206. }
    207. module.exports = Asyncor

    提供三个函数

    asyncor.add()
    asyncor.next()
    asyncor.start()
    1. asyncor.add(function (end) {
    2. setTimeout(()=>{
    3. //自己的异步代码
    4. end();//通知控制器当前工作已完成
    5. })
    6. })
    7. asyncor.next(function (end) {
    8. setTimeout(()=>{
    9. //自己的异步代码
    10. end();//通知控制器当前工作已完成
    11. })
    12. })
    13. asyncor.start(function () {
    14. //整个任务都完成之后会调用这里,可以在这里自定义功能
    15. });

    支持链式调用

    asyncor.add().add().next().next().add().next().add().start()
    

    如何使用?如图表示任务计划执行情况,每一个单元格表示一个任务,横向的表示并发执行,纵向表示任务批次,批次需要串行执行。

    第1批次123
    第2批次45
    第3批次6
    第4批次7
    第5批次8910

    对应代码示例

    1. //先创建对象
    2. const asyncor=new Asyncor();
    3. /** 第一批次 */
    4. asyncor.add(end=>{
    5. //任务1
    6. console.log("任务1")
    7. end();
    8. }).add(end=>{
    9. //任务2
    10. console.log("任务2")
    11. end();
    12. }).add(end=>{
    13. //任务3
    14. console.log("任务3")
    15. end();
    16. });
    17. /** 第二批次 */
    18. asyncor.next(end=>{
    19. //任务4
    20. console.log("任务4")
    21. end();
    22. }).add(end=>{
    23. //任务5
    24. console.log("任务5")
    25. end();
    26. });
    27. /** 第三批次 */
    28. asyncor.next(end=>{
    29. //任务6
    30. console.log("任务6")
    31. end();
    32. });
    33. /** 第四批次 */
    34. asyncor.next(end=>{
    35. //任务7
    36. console.log("任务7")
    37. end();
    38. });
    39. /** 第五批次 */
    40. asyncor.next(end=>{
    41. //任务8
    42. console.log("任务8")
    43. end();
    44. }).add(end=>{
    45. //任务9
    46. console.log("任务9")
    47. end();
    48. }).add(end=>{
    49. //任务10
    50. console.log("任务10")
    51. end();
    52. });
    53. /** 启动 */
    54. asyncor.start(()=>{
    55. console.log("执行完成");
    56. });
    57. 输出:
    58. 任务1
    59. 任务2
    60. 任务3
    61. 任务4
    62. 任务5
    63. 任务6
    64. 任务7
    65. 任务8
    66. 任务9
    67. 任务10
    68. 执行完成

  • 相关阅读:
    opencv之坑(八)——putText中文乱码解决
    浅谈web前端工程师hr面试经典问题20+
    Java笔记(一):volatile、synchronized关键字
    C语言内存函数
    分拆计划陷入困境,英特尔还能重回巅峰吗?
    LeetCode 150. 逆波兰表达式求值
    Spring Cloud
    【JUC】循环屏障 CyclicBarrier 详解
    js中call、apply和bind:
    vcpkg安装第三方库,报错fatal error RC1107: invalid usage; use RC /? for Help
  • 原文地址:https://blog.csdn.net/qq_43319748/article/details/132746961