• 使用Promise.race()实现控制并发


    之前做过一个 Node.js实现分片上传 的功能。当时前端采用文件切片后并发上传,大大提高了上传的速度,使用Promise.race()管理并发池,有效避免浏览器内存耗尽。

    现在的问题:Node.js服务端合并大文件分片 内存耗尽导致服务重启

    服务端代码

    const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
    const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
    
    // 2. 不会阻塞EventLoop
    bufferList.forEach((hash, index) => {
      fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
           ws.write(data);
      });
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    服务器配置:RAM:1024MB 1vCPU (运行了一些其他服务)

    测试发现只要上传的文件超过300M,在合并分片时就会导致内存耗尽服务重启,根本无法完成分片合并。

    解决方案:能不能在循环中控制读取文件的并发数呢?这样就不会有大量文件同时读取到内存中导致服务崩溃了。

    尝试像前端那样使用 Promise.race 控制:

    const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
    const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
    
    // Promise.race 并发控制
    const pool = [];
    let finish = 0; // 已经写入完成的分片数量
    
    // 使用Promise包裹读取文件的异步操作
    const PR = (index) => {
      return new Promise((resolve) => {
        fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
          ws.write(data)
          resolve({});
        });
      });
    };
    
    (async function easyRead() {
      for (let i = 0; i < bufferList.length; i ++) {
        const task = PR(i).then(val => {
          finish+=1
          const index = pool.findIndex(t => t === task);
          pool.splice(index);
          if (finish === bufferList.length) {
            ws.close();
          }
        });
        pool.push(task);
        if (pool.length === 1) { // 这里并发数量只能是1 否则分片写入是乱序的 格式会被损坏
          await Promise.race(pool);
        }
      }
    })()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    这时神奇的事情就发生了,我们在for循环中使用了Promise.race()控制了同一时间读入到内存中的文件数量。

    再次测试,服务已经不会在合并分片时崩溃,即使1个G的文件分片也可以在3秒左右合并完成。

    之前做过一个 Node.js实现分片上传 的功能。当时前端采用文件切片后并发上传,大大提高了上传的速度,使用Promise.race()管理并发池,有效避免浏览器内存耗尽。

    现在的问题:Node.js服务端合并大文件分片 内存耗尽导致服务重启

    服务端代码

    const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
    const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
    
    // 2. 不会阻塞EventLoop
    bufferList.forEach((hash, index) => {
      fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
           ws.write(data);
      });
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    服务器配置:RAM:1024MB 1vCPU (运行了一些其他服务)

    测试发现只要上传的文件超过300M,在合并分片时就会导致内存耗尽服务重启,根本无法完成分片合并。

    解决方案:能不能在循环中控制读取文件的并发数呢?这样就不会有大量文件同时读取到内存中导致服务崩溃了。

    尝试像前端那样使用 Promise.race 控制:

    const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
    const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
    
    // Promise.race 并发控制
    const pool = [];
    let finish = 0; // 已经写入完成的分片数量
    
    // 使用Promise包裹读取文件的异步操作
    const PR = (index) => {
      return new Promise((resolve) => {
        fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
          ws.write(data)
          resolve({});
        });
      });
    };
    
    (async function easyRead() {
      for (let i = 0; i < bufferList.length; i ++) {
        const task = PR(i).then(val => {
          finish+=1
          const index = pool.findIndex(t => t === task);
          pool.splice(index);
          if (finish === bufferList.length) {
            ws.close();
          }
        });
        pool.push(task);
        if (pool.length === 1) { // 这里并发数量只能是1 否则分片写入是乱序的 格式会被损坏
          await Promise.race(pool);
        }
      }
    })()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    这时神奇的事情就发生了,我们在for循环中使用了Promise.race()控制了同一时间读入到内存中的文件数量。

    再次测试,服务已经不会在合并分片时崩溃,即使1个G的文件分片也可以在3秒左右合并完成。

    async、await

    相信这个是大家更加常用的处理异步的方法,因为它写起来更加优雅。我们上边的Promise.race也可以用它来替换,使代码看起来更加简洁明了:

    const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
    const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
    
    let finish = 0; // 已经写入完成的分片数量
    
    // 使用Promise包裹读取文件的异步操作
    const PR = (index) => {
      return new Promise((resolve) => {
        fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
          ws.write(data)
          resolve({});
        });
      });
    };
    
    (async function easyRead() {
      for (let i = 0; i < bufferList.length; i ++) {
        await PR(i)
        finish +=1
        if (finish === bufferList.length) {
            ws.close();
        }
      }
    })()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以达到相同的效果,看起来是不是清晰了很多?

    总结

    知道Promise.race()的人很多,这样的面试题也很多,但是能运用到实践中解决实际的问题却不是很多。

    希望本文可以帮到你。

    文章首发于 IICOOM-技术博客|个人博客 《使用Promise.race()实现控制并发》

  • 相关阅读:
    python并发执行request请求
    06 Kafka线上集群部署方案
    02.5 自动微分
    聊聊神经网络的优化算法
    Flutter 局部刷新
    ELK日志框架图总结
    Quartz-cron时间设置
    Hbase基本操作
    【Vue】数据监视&输入绑定
    elasticsearch 搜索引擎+elasticsearch-head+kibana windos环境安装使用手册
  • 原文地址:https://blog.csdn.net/IICOOM/article/details/126728621