并发限制的异步任务调度器
V1
/*
* @Author: CoyoteWaltz <coyote_waltz@163.com>
* @Date: 2020-08-06 22:50:17
* @LastEditTime: 2020-12-26 23:19:40
* @LastEditors: CoyoteWaltz <coyote_waltz@163.com>
* @Description: 并发限制的异步任务调度器
* @TODO:
*/
class Scheduler {
waitQueue = [];
count = 0;
constructor(limit = 2) {
this.limit = limit;
}
add(promiseCreator, ...args) {
return new Promise((resolve, reject) => {
// 把每一个任务的 resolve 闭包入这个 task 是很妙的 只有当 promiseCreator 真正执行回调的时候才调用
const task = this.createTask(promiseCreator, args, resolve, reject);
// 执行 or 排队
if (this.count < this.limit) {
task();
} else {
this.waitQueue.push(task);
}
});
}
// 封装一个 任务 fn
createTask(fn, args, resolve, reject) {
// return 一个可执行的 task 当然就是函数啦
return () => {
// 执行 就++ 可以放到第一句
this.count++;
fn(...args)
.then(resolve)
.catch(reject)
.finally(() => {
// 结束之后 让下一个等待的任务启动
this.count--;
if (this.waitQueue.length) {
const task = this.waitQueue.shift();
task();
}
});
};
}
}
const timeout = (time) =>
new Promise((resolve) => {
setTimeout(() => {
resolve(time);
}, time);
});
const scheduler = new Scheduler();
const addTask = (time, order) => {
scheduler
.add(
(x, y, z) => {
console.log(x, y, z);
return timeout(time);
},
1,
2,
3
)
.then(() => console.log(order));
};
addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);
// 2 3 1 4
// 同时最多运行的任务只有两个
// Promise.race([
// timeout(4000).then(console.log),
// timeout(3000).then(console.log),
// ]).then((res) => console.log(res));
V2
马进分享的代码片段,摘录于 https://thinking.tomotoes.com/archives/2020/12/25-31 (opens in a new tab)
// 马老师 version
const getRequestWithLimit = (limit) => {
let count = 0;
const blockQueue = [];
return async (fn) => {
count++;
if (count > limit) {
// 这里 await resolve 理解绝了 resolve 之后再开始后续的 fn()
// 可以理解为用 await resolve 来阻塞 线程执行
await new Promise((resolve) => blockQueue.push(resolve));
}
try {
await fn();
} catch (e) {
return Promise.reject(e);
} finally {
count--;
blockQueue.length && blockQueue.shift()();
}
};
};
const requestWithLimit2 = getRequestWithLimit(2);
requestWithLimit2(timeout(100));