Skip to content

一道并发相关的面试题

这篇文章是并发异步操作系列文章的最终篇,我们以一道常见的面试题来结束当前这个系列。

题目#

先来看题目的描述:

实现一个 Scheduler,最多同时运行 n 个异步任务。add(fn) 接收返回 Promise 的函数,返回一个在该任务完成时 resolve 的 Promise。

// 要求:最多并发 2 个任务,输出顺序应为 2 3 1 4
const timeout = (t, tag) =>
new Promise((r) => setTimeout(() => (console.log(tag), r(tag)), t))
class Scheduler {
/* TODO */
}
const scheduler = new Scheduler(2)
const addTask = (t, tag) => scheduler.add(() => timeout(t, tag))
addTask(1000, '1')
addTask(500, '2')
addTask(300, '3')
addTask(400, '4')
// 期望打印:2 3 1 4

分析#

首先我们对该题目做一个简单的分析。

先看一下这个方法:

const timeout = (t, tag) =>
new Promise((r) => setTimeout(() => (console.log(tag), r(tag)), t))

这是一个个“延时完成的异步任务”工厂,用来模拟不同耗时的 Promise。

整个 Promise 不会 reject,只会在 t 毫秒后 resolve(tag)。

面试题一般就喜欢写得比较“花”,因此现在的可读性其实是比较低的,我们换一种写法:

const timeout = (t, tag) =>
new Promise((resolve) => {
setTimeout(() => {
console.log(tag) // 打印标记
resolve(tag) // 用 tag 作为任务结果
}, t)
})

接下来 Scheduler 是我们需要实现的任务调度器类:

class Scheduler {
/* TODO */
}
const scheduler = new Scheduler(2)

该类在实例化的时候接收一个数字的参数,很明显这是一个并发数的上限,相当于我们上一篇文章中工人的数量。

下面是一个 addTask 方法:

const addTask = (t, tag) => scheduler.add(() => timeout(t, tag))

思考🤔:为啥不写成 scheduler.add(timeout(t, tag))

回答:那样会立刻启动这个 Promise,完全绕过调度器,导致并发上限失效。传函数进去,才能让调度器控制何时开始。

当然,目前的写法同样比较“花”,可读性较低。实际上上面的写法等价于下面的写法:

function addTask(t, tag) {
// 返回一个在该任务完成时 resolve 的 Promise
return scheduler.add(() => timeout(t, tag))
}

最后是这一段代码:

addTask(1000, '1')
addTask(500, '2')
addTask(300, '3')
addTask(400, '4')
// 期望打印:2 3 1 4

为什么会是这样的结果呢?

因为上面的代码中设置的并发上限为 2,这意味着只有两项能同时跑,后面的要等有“坑位”。按时间线看:

用一条小时间轴更直观:

1: [0----------------------1000)
2: [0----------500)
3: [500-----800)
4: [800--------1200)
输出: 2 3 1 4

实现#

对题目分析完成后,接下来我们来一点一点完成该题目。

步骤一:初始化队列与并发上限、计数器

我们需要一个等待队列存任务,以及两个计数器来感知池子的占用与进度。

class Scheduler {
constructor(ConcurrencyCount = 3) {
this.ConcurrencyCount = ConcurrencyCount // 并发上限(池子大小)
this.tasks = [] // 等待队列:{ task, resolve, reject }
this.runningCount = 0 // 当前正在执行的数量
this.completedCount = 0 // 已完成任务数(可选,便于观测)
}
}

并发上限决定“同时能跑多少个”;runningCount 控制是否还能再启动新任务;completedCount 纯属“观测指标”,方便打印状态或做进度条。

步骤二:add(task) 入队,并返回“该任务完成时会 resolve 的 Promise”

根据题干要求:add(fn) 接收返回 Promise 的函数,并返回一个在该任务完成时 resolve 的 Promise。用 Promise.withResolvers()能优雅地拿到 resolve/reject

add(task) {
const { promise, resolve, reject } = Promise.withResolvers();
// 入队:把任务和它对应的“完成/失败回调”绑在一起
this.tasks.push({ task, resolve, reject });
// 入队后立刻尝试调度(填满并发窗口)
this._run();
// 把“该任务最终结果”的 promise 返回给调用方
return promise;
}

add 里把 resolve/reject 和任务绑在一起,等任务真跑完,再把结果原样交回调用方。

步骤三:调度器 _run()——填满并发窗口

_run() 的职责只有一个:在有空位、且有待执行任务时,把队首任务启动起来

_run() {
while (this.runningCount < this.ConcurrencyCount && this.tasks.length > 0) {
const { task, resolve, reject } = this.tasks.shift(); // 取出一个任务
this.runningCount++; // 占一个“坑位”,马上开跑
task()
.then(resolve) // 把该任务的结果交还给 add() 返回的 promise
.catch(reject) // 失败也要透传给调用方
.finally(() => { // 无论成败,都要“归还坑位”,继续调度
this.runningCount--;
this.completedCount++;
this._run(); // 继续尝试启动下一项,保持窗口尽可能满
});
}
}

while 能在一次调度里尽量把窗口填满;把“再调度”放在 finally,确保一有空位立刻补上。

完整的实现如下:

class Scheduler {
constructor(ConcurrencyCount = 3) {
this.ConcurrencyCount = ConcurrencyCount // 最大并发数
this.tasks = [] // 等待队列
this.runningCount = 0 // 当前正在执行的数量
this.completedCount = 0 // 已完成任务数
}
add(task) {
const { promise, resolve, reject } = Promise.withResolvers()
this.tasks.push({ task, resolve, reject })
this._run()
return promise
}
_run() {
while (this.runningCount < this.ConcurrencyCount && this.tasks.length > 0) {
const { task, resolve, reject } = this.tasks.shift()
this.runningCount++
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.runningCount--
this.completedCount++
this._run() // 尝试调度下一个任务
})
}
}
}

写在最后#

异步对于初学者来讲,本来就是一个难点,一旦叠上并发与限流,哪怕老手也容易绕晕。

这个系列从 Abort/AbortSignal 起步,一点一点介绍 Promise 上面的常见方法,做了可复用的超时/取消小工具,落地了可配置并发上限的执行器,最后用一题经典 Scheduler 面试题结尾。

希望本系列文章能让同学们写异步更加有谱:该并发时并发、该限流时限流、若能帮助同学们少踩一个坑,那么这套系列文章也就值了。

下一篇文章我将新开一个系列,大家对什么感兴趣,可以在评论区留言。