const limitFn = (limit) => {
// 这两个变量相对于下面函数执行是闭包 局部全局变量
// 存储promise异步的任务队列
const queue = [];
// 用于标记当前队列里Item个数
let activeCount = 0;
// 逐步释放队列任务
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
// 执行器
const run = async (fn, resolve, reject, ...args) => {
// 每次加入一个任务就+1
activeCount++;
// 这里 fn(...args)返回一个Promise
try {
const res = await fn(...args);
// 每一个任务都有独自的resolve将执行结果吐出去
resolve(res);
} catch (err) {
// 这里将报错函数抛出去 以便再次执行
reject(fn)
}
// 每执行完一个任务 就从队列中再释放一个出来执行
next();
};
const enqueue = (fn, resolve, reject, ...args) => {
/*
run.bind 既能把实施参数传进函数内又不立即执行 并返回一个新的函数
将新函数放进去任务队列 暂不执行
*/
queue.push(run.bind(null, fn, resolve, reject, ...args));
// 如果任务队列数量小于限制数量且任务队列有任务则 出队一个任务执行
// 如果进队列的速度大于某个任务执行过程 则会限流执行 但会加进任务队列中等待执行 相当于并发执行的数量始终控制在limit内
if (activeCount < limit && queue.length > 0) {
queue.shift()();
}
};
// 返回最终结果promise
const generator = (fn, ...args) =>
new Promise((resolve, reject) => {
enqueue(fn, resolve, reject, ...args);
});
return generator;
};
let runWay = limitFn(5);
/* 模拟并发异步请求 100条 */
let workNum = 100;
// 异步函数list
let asyncFnList = [];
// 并发请求完的结果list
let resultList = [];
// 报错后的报错异步请求list
let errorList = [];
// 模拟请求
let fakeFn = () => {
return new Promise((resolve, reject) => {
let time = Math.random() * 1000
setTimeout(() => {
resolve(time)
}, time);
})
}
// 异步任务进队列
for (let i = 0; i < workNum; i++) {
asyncFnList.push(fakeFn)
}
const isFinished = () => (resultList.length + errorList.length) == workNum
const doNotifi = () => {
console.log('所有异步任务都执行完:' + JSON.stringify(resultList))
}
// 模拟并发100条请求
asyncFnList.forEach(async fn => {
try {
let res = await runWay(fn);
console.log('单个异步任务执行完:' + res)
resultList.push(res);
// 判断是否所有异步任务都执行完毕
isFinished() && doNotifi()
} catch (errorFn) {
errorList.push(errorFn)
}
})
// 如果有报错异步任务 则重新执行
// TODO 这里可以对报错超过3次的异步任务进行剔除
while (errorList.length > 0) {
try {
let res = runWay(errorList.shift());
resultList.push(res);
// 判断是否所有异步任务都执行完毕
isFinished() && doNotifi()
} catch (errorFn) {
errorList.push(errorFn)
}
}