• JavaScript 任务池


    JavaScript 任务池

    本文写于 2022 年 5 月 13 日

    线程池

    在多线程语言中,我们通常不会随意的在需要启动线程的时候去启动,而是会选择创建一个线程池。

    所谓线程池,本意其实就是(不止这些作用,其余作用可以自行查阅):

    1. 节省操作系统资源
    2. 限制最大线程数。

    对于 JavaScript 来说,虽然不存在“启动线程”这种问题,但我们还是可以通过类似的思想,来限制我们做异步操作的数量

    例如我们同时将进行 1000 个异步请求,那么我们可以约束同时执行的最大异步操作数为 100。

    第一次我们执行 100 个异步任务,还剩余 900 个未执行。接着每当前面的异步任务有执行完毕之后,我们就会将空出来的位置执行下一个异步任务。

    分析

    首先我们需要一个数组,用它来存储尚未执行的任务,每个任务都是一个函数,这个函数必须要返回一个 Promise。

    type Task = () => Promise<unknown>;
    
    const tasks: Task[] = [];
    

    其次我们需要一个方法来进行任务的添加。

    function addTask(task: Task): void;
    

    最后我们需要一个函数来执行我们所有的 task。

    而在这之前,我们还需要定义一个值,来定义同时执行的异步任务的最大数量。

    function execTasks(): void;
    

    实现

    根据我们的分析,我们可以写下基础的代码如下:

    interface TaskPool {
      addTask(task: Task): void;
    }
    
    type Task = () => Promise<unknown>;
    
    function newTaskPool(max = 10): TaskPool {
      const tasks: Task[] = [];
    
      function addTask(task: Task): void {}
    
      function execTasks(): void {}
    }
    

    新增任务非常简单,我们写出如下代码填充 addTask

    function addTask(task: Task): void {
      tasks.push(task);
    }
    

    接下来就是重头戏。如何实现 execTasks 方法来限制最大异步任务数量呢?

    首先我们来明确一点,在下面这个场景中,如果 foo 函数是异步操作,那么是不会阻塞我们的代码执行的。

    console.log("Before");
    foo();
    console.log("After");
    

    那么我们可以这么操作:

    1. 定义一个变量用来记录当前的空闲任务数量;
    2. 执行 execTasks 时,会选取当前任务数量和空闲任务数二者相比较小的一个;
    3. 根据该值进行循环,每次循环弹出 tasks 第一位的任务进行执行;
    4. 执行前将空闲任务数 -1,执行完毕后空闲任务数 +1,并再次执行 execTasks
    let leisure = max;
    
    function execTasks(): void {
      if (tasks.length === 0) return;
    
      const execTaskNum = Math.min(tasks.length, leisure);
      for (let i = 0; i < execTaskNum; i++) {
        const task = tasks.shift();
        if (!task) continue;
    
        leisure--;
        task().finally(() => {
          leisure++;
          execTasks();
        });
      }
    }
    

    最后我们只剩下了一个问题了,我们如何在 addTask 后执行 execTasks,但又不会让下面这种情况导致频繁执行 execTasks

    for (let i = 0; i < 100; i++) addTask();
    

    可以利用防抖 + setTimeout(() => {},0) 的特性来完成。

    function addTask(task: Task) {
      tasks.push(task);
      execTasksAfterAdd();
    }
    
    // 这里借用了 lodash 的 debounce 函数,具体实现不多说,可以看我以前的文章:防抖与节流
    const execTasksAfterAdd = debounce(execTasks);
    

    完整代码:

    import { debounce } from "lodash";
    
    interface TaskQueue {
      addTask: (task: () => Promise<any>) => void;
    }
    
    function newTaskQueue(maxTaskNum = 10): TaskQueue {
      let _leisure = maxTaskNum;
    
      const _tasks: Array<() => Promise<any>> = [];
    
      function addTask(task: () => Promise<any>) {
        _tasks.push(task);
        execAfterTask();
      }
    
      const execAfterTask = debounce(execTasks);
    
      function execTasks() {
        if (_tasks.length === 0) return;
    
        const execTaskNum = Math.min(_tasks.length, _leisure);
        for (let i = 0; i < execTaskNum; i++) {
          const task = _tasks.shift();
          if (!task) continue;
    
          _leisure--;
          task().finally(() => {
            _leisure++;
            execTasks();
          });
        }
      }
    
      return { addTask };
    }
    
    const queue = newTaskQueue(5);
    
    for (let i = 0; i < 10; i++) {
      queue.addTask(function () {
        return new Promise<void>((resolve) => {
          setTimeout(() => resolve(), 800);
        });
      });
    }
    

    使用场景

    其实这种做法的使用场景是比较少的。

    绝大多数情况我们都不需要这么去做,除非碰到很极端的需求。

    例如我们需要用 Node.js 去设计一个吞吐量极大的服务,那么同时发生大量的网络请求很可能把带宽直接打满,导致后续的请求无法打到该服务,此时就可以使用任务池来控制最大网络请求量。

    (完)

  • 相关阅读:
    Scrum敏捷开发的的优势和实施中的一些挑战
    带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
    Hadoop3:MapReduce之简介、WordCount案例源码阅读、简单功能开发
    拓展:赖世雄英语初级美语(上)
    Redis 缓存穿透、缓存击穿、缓存雪崩
    如何在表格里面添加表单,并且进行表单验证
    java SpringBoot登录验证token拦截器
    Dom.nodeType
    模型部署时的调试技巧,debug方法
    【JavaSE专栏90】用最简单的方法,使用 JDBC 连接 MySQL 数据库
  • 原文地址:https://www.cnblogs.com/xhyccc/p/16268522.html