这篇文章是并发异步操作系列文章的最终篇,我们以一道常见的面试题来结束当前这个系列。
题目#
先来看题目的描述:
实现一个 Scheduler,最多同时运行 n 个异步任务。add(fn) 接收返回 Promise 的函数,返回一个在该任务完成时 resolve 的 Promise。
// 要求:最多并发 2 个任务,输出顺序应为 2 3 1 4const timeout = (t, tag) => new Promise((r) => setTimeout(() => (console.log(tag), r(tag)), t))
class Scheduler { /* TODO */}
const scheduler = new Scheduler(2)const addTask = (t, tag) => scheduler.add(() => timeout(t, tag))
addTask(1000, '1')addTask(500, '2')addTask(300, '3')addTask(400, '4')// 期望打印:2 3 1 4分析#
首先我们对该题目做一个简单的分析。
先看一下这个方法:
const timeout = (t, tag) => new Promise((r) => setTimeout(() => (console.log(tag), r(tag)), t))这是一个个“延时完成的异步任务”工厂,用来模拟不同耗时的 Promise。
timeout = (t, tag) => new Promise(r => …):返回一个 Promise,r是resolve。setTimeout(() => (console.log(tag), r(tag)), t):在 t 毫秒后执行回调。这个箭头函数的函数体不是{ ... },而是一个表达式,表达式里用了逗号运算符,先console.log(tag),再r(tag)。所以会先打印 tag,然后把 tag 作为结果resolve。
整个 Promise 不会 reject,只会在 t 毫秒后 resolve(tag)。
面试题一般就喜欢写得比较“花”,因此现在的可读性其实是比较低的,我们换一种写法:
const timeout = (t, tag) => new Promise((resolve) => { setTimeout(() => { console.log(tag) // 打印标记 resolve(tag) // 用 tag 作为任务结果 }, t) })接下来 Scheduler 是我们需要实现的任务调度器类:
class Scheduler { /* TODO */}
const scheduler = new Scheduler(2)该类在实例化的时候接收一个数字的参数,很明显这是一个并发数的上限,相当于我们上一篇文章中工人的数量。
下面是一个 addTask 方法:
const addTask = (t, tag) => scheduler.add(() => timeout(t, tag))-
addTask是一个辅助函数,接收t和tag,返回的是scheduler.add(...)的结果。那么我们现在获取到了一个新的信息,那就是要实现的 Scheduler 类上面,一定会有一个 add 方法。 -
scheduler.add的参数必须是一个返回 Promise 的函数(题干信息)。这里传入的是() => timeout(t, tag):-
这是一个惰性函数,不会立刻执行
timeout; -
它闭包住了
t与tag,等调度器排到它、真正调用这函数时,才会执行timeout(t, tag)并开始计时。
-
思考🤔:为啥不写成 scheduler.add(timeout(t, tag))?
回答:那样会立刻启动这个 Promise,完全绕过调度器,导致并发上限失效。传函数进去,才能让调度器控制何时开始。
当然,目前的写法同样比较“花”,可读性较低。实际上上面的写法等价于下面的写法:
function addTask(t, tag) { // 返回一个在该任务完成时 resolve 的 Promise return scheduler.add(() => timeout(t, tag))}最后是这一段代码:
addTask(1000, '1')addTask(500, '2')addTask(300, '3')addTask(400, '4')// 期望打印:2 3 1 4为什么会是这样的结果呢?
因为上面的代码中设置的并发上限为 2,这意味着只有两项能同时跑,后面的要等有“坑位”。按时间线看:
t=0:启动任务 1(1000)和任务 2(500)。t=500:任务 2 先完成→打印 2;此时空出一个坑位,立刻启动任务 3(300)。t=800:任务 3 完成→打印 3;再启动任务 4(400)。t=1000:任务 1 完成→打印 1。t=1200:任务 4 完成→打印 4。
用一条小时间轴更直观:
1: [0----------------------1000)2: [0----------500)3: [500-----800)4: [800--------1200)输出: 2 3 1 4实现#
对题目分析完成后,接下来我们来一点一点完成该题目。
步骤一:初始化队列与并发上限、计数器
我们需要一个等待队列存任务,以及两个计数器来感知池子的占用与进度。
class Scheduler { constructor(ConcurrencyCount = 3) { this.ConcurrencyCount = ConcurrencyCount // 并发上限(池子大小) this.tasks = [] // 等待队列:{ task, resolve, reject } this.runningCount = 0 // 当前正在执行的数量 this.completedCount = 0 // 已完成任务数(可选,便于观测) }}并发上限决定“同时能跑多少个”;runningCount 控制是否还能再启动新任务;completedCount 纯属“观测指标”,方便打印状态或做进度条。
步骤二:add(task) 入队,并返回“该任务完成时会 resolve 的 Promise”
根据题干要求:add(fn) 接收返回 Promise 的函数,并返回一个在该任务完成时 resolve 的 Promise。用 Promise.withResolvers()能优雅地拿到 resolve/reject。
add(task) { const { promise, resolve, reject } = Promise.withResolvers(); // 入队:把任务和它对应的“完成/失败回调”绑在一起 this.tasks.push({ task, resolve, reject }); // 入队后立刻尝试调度(填满并发窗口) this._run(); // 把“该任务最终结果”的 promise 返回给调用方 return promise;}在 add 里把 resolve/reject 和任务绑在一起,等任务真跑完,再把结果原样交回调用方。
步骤三:调度器 _run()——填满并发窗口
_run() 的职责只有一个:在有空位、且有待执行任务时,把队首任务启动起来
_run() { while (this.runningCount < this.ConcurrencyCount && this.tasks.length > 0) { const { task, resolve, reject } = this.tasks.shift(); // 取出一个任务 this.runningCount++; // 占一个“坑位”,马上开跑
task() .then(resolve) // 把该任务的结果交还给 add() 返回的 promise .catch(reject) // 失败也要透传给调用方 .finally(() => { // 无论成败,都要“归还坑位”,继续调度 this.runningCount--; this.completedCount++; this._run(); // 继续尝试启动下一项,保持窗口尽可能满 }); }}用 while 能在一次调度里尽量把窗口填满;把“再调度”放在 finally,确保一有空位立刻补上。
完整的实现如下:
class Scheduler { constructor(ConcurrencyCount = 3) { this.ConcurrencyCount = ConcurrencyCount // 最大并发数 this.tasks = [] // 等待队列 this.runningCount = 0 // 当前正在执行的数量 this.completedCount = 0 // 已完成任务数 }
add(task) { const { promise, resolve, reject } = Promise.withResolvers() this.tasks.push({ task, resolve, reject }) this._run() return promise }
_run() { while (this.runningCount < this.ConcurrencyCount && this.tasks.length > 0) { const { task, resolve, reject } = this.tasks.shift() this.runningCount++
task() .then(resolve) .catch(reject) .finally(() => { this.runningCount-- this.completedCount++ this._run() // 尝试调度下一个任务 }) } }}写在最后#
异步对于初学者来讲,本来就是一个难点,一旦叠上并发与限流,哪怕老手也容易绕晕。
这个系列从 Abort/AbortSignal 起步,一点一点介绍 Promise 上面的常见方法,做了可复用的超时/取消小工具,落地了可配置并发上限的执行器,最后用一题经典 Scheduler 面试题结尾。
希望本系列文章能让同学们写异步更加有谱:该并发时并发、该限流时限流、若能帮助同学们少踩一个坑,那么这套系列文章也就值了。
下一篇文章我将新开一个系列,大家对什么感兴趣,可以在评论区留言。