Skip to content

Generator 进阶

在掌握 Generator 基础后,来看它在异步编程和更复杂场景中的应用。

Generator 与异步#

Generator 的暂停-恢复特性非常适合处理异步操作。

手动执行异步 Generator#

function* fetchUser() {
const response = yield fetch('/api/user')
const user = yield response.json()
console.log(user)
}
// 手动执行
function run(generator) {
const gen = generator()
function step(result) {
if (result.done) return Promise.resolve(result.value)
return Promise.resolve(result.value)
.then((data) => step(gen.next(data)))
.catch((err) => step(gen.throw(err)))
}
return step(gen.next())
}
run(fetchUser)

自动执行器(co 模式)#

function co(generator) {
return new Promise((resolve, reject) => {
const gen = generator()
function step(nextValue) {
let result
try {
result = gen.next(nextValue)
} catch (e) {
return reject(e)
}
if (result.done) {
return resolve(result.value)
}
Promise.resolve(result.value)
.then(step)
.catch((err) => {
try {
result = gen.throw(err)
if (result.done) {
resolve(result.value)
} else {
Promise.resolve(result.value).then(step).catch(reject)
}
} catch (e) {
reject(e)
}
})
}
step()
})
}
// 使用
co(function* () {
const user = yield fetch('/api/user').then((r) => r.json())
const posts = yield fetch(`/api/posts?userId=${user.id}`).then((r) =>
r.json()
)
console.log(posts)
})

对比 async/await#

Generator + co 是 async/await 的前身:

// Generator
co(function* () {
try {
const a = yield Promise.resolve(1)
const b = yield Promise.resolve(2)
return a + b
} catch (e) {
console.error(e)
}
})
// async/await
;(async () => {
try {
const a = await Promise.resolve(1)
const b = await Promise.resolve(2)
return a + b
} catch (e) {
console.error(e)
}
})()

协程概念#

Generator 可以实现协程(coroutine),多个执行流程之间协作:

// 生产者
function* producer(consumer) {
console.log('生产者启动')
consumer.next() // 启动消费者
let n = 0
while (n < 5) {
n++
console.log(`生产: ${n}`)
consumer.next(n)
}
consumer.return() // 结束消费者
}
// 消费者
function* consumer() {
console.log('消费者启动')
while (true) {
const n = yield
if (n === undefined) break
console.log(`消费: ${n}`)
}
console.log('消费者结束')
}
// 运行
const c = consumer()
const p = producer(c)
p.next()
// 生产者启动
// 消费者启动
// 生产: 1
// 消费: 1
// 生产: 2
// 消费: 2
// ...
// 消费者结束

双向通信#

function* ping() {
while (true) {
const msg = yield 'ping'
console.log('收到:', msg)
}
}
function* pong(pingGen) {
let msg = pingGen.next().value
while (true) {
console.log('收到:', msg)
msg = pingGen.next('pong').value
yield
}
}
const pingGen = ping()
const pongGen = pong(pingGen)
pongGen.next() // 收到: ping
pongGen.next() // 收到: pong, 收到: ping

流处理#

数据转换管道#

function* map(iterable, fn) {
for (const item of iterable) {
yield fn(item)
}
}
function* filter(iterable, predicate) {
for (const item of iterable) {
if (predicate(item)) {
yield item
}
}
}
function* take(iterable, n) {
let count = 0
for (const item of iterable) {
if (count++ >= n) break
yield item
}
}
function* skip(iterable, n) {
let count = 0
for (const item of iterable) {
if (count++ < n) continue
yield item
}
}
// 链式处理
function pipe(source, ...transforms) {
return transforms.reduce((acc, transform) => transform(acc), source)
}
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
const result = [
...pipe(
numbers,
(x) => filter(x, (n) => n % 2 === 0),
(x) => map(x, (n) => n * 2),
(x) => take(x, 3)
),
]
// [4, 8, 12]

分块处理#

function* chunk(iterable, size) {
let batch = []
for (const item of iterable) {
batch.push(item)
if (batch.length === size) {
yield batch
batch = []
}
}
if (batch.length > 0) {
yield batch
}
}
;[...chunk([1, 2, 3, 4, 5, 6, 7], 3)]
// [[1, 2, 3], [4, 5, 6], [7]]

滑动窗口#

function* slidingWindow(iterable, size) {
const buffer = []
for (const item of iterable) {
buffer.push(item)
if (buffer.length === size) {
yield [...buffer]
buffer.shift()
}
}
}
;[...slidingWindow([1, 2, 3, 4, 5], 3)]
// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

并发控制#

任务队列#

function* taskQueue(tasks, concurrency) {
const executing = new Set()
for (const task of tasks) {
const promise = task().then((result) => {
executing.delete(promise)
return result
})
executing.add(promise)
if (executing.size >= concurrency) {
yield Promise.race(executing)
}
}
// 等待剩余任务
while (executing.size > 0) {
yield Promise.race(executing)
}
}
async function runTasks(tasks, concurrency) {
const results = []
for await (const result of taskQueue(tasks, concurrency)) {
results.push(result)
}
return results
}
// 使用
const tasks = Array.from(
{ length: 10 },
(_, i) => () =>
new Promise((resolve) => setTimeout(() => resolve(i), Math.random() * 1000))
)
runTasks(tasks, 3).then(console.log)

重试机制#

function* retry(fn, maxAttempts, delay) {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const result = yield fn()
return result
} catch (error) {
if (attempt === maxAttempts) {
throw error
}
console.log(`尝试 ${attempt} 失败,${delay}ms 后重试...`)
yield new Promise((resolve) => setTimeout(resolve, delay))
}
}
}
async function runWithRetry(fn, maxAttempts = 3, delay = 1000) {
const gen = retry(fn, maxAttempts, delay)
let result = gen.next()
while (!result.done) {
try {
const value = await result.value
result = gen.next(value)
} catch (error) {
result = gen.throw(error)
}
}
return result.value
}

状态机进阶#

HTTP 请求状态机#

function* httpRequestStateMachine() {
while (true) {
const action = yield 'idle'
if (action === 'fetch') {
yield 'loading'
try {
const response = yield 'fetching'
yield { state: 'success', data: response }
} catch (error) {
yield { state: 'error', error }
}
}
}
}
class RequestManager {
constructor() {
this.machine = httpRequestStateMachine()
this.state = this.machine.next().value
}
async fetch(url) {
this.state = this.machine.next('fetch').value // loading
try {
const response = await fetch(url)
const data = await response.json()
this.state = this.machine.next(data).value // success
return this.state.data
} catch (error) {
this.state = this.machine.throw(error).value // error
throw error
}
}
}

游戏循环#

function* gameLoop() {
let state = 'menu'
while (true) {
switch (state) {
case 'menu':
const action = yield { state: 'menu', options: ['start', 'quit'] }
if (action === 'start') state = 'playing'
if (action === 'quit') return 'game ended'
break
case 'playing':
const event = yield { state: 'playing', message: 'Game running...' }
if (event === 'win') state = 'victory'
if (event === 'lose') state = 'gameOver'
if (event === 'pause') state = 'paused'
break
case 'paused':
const resume = yield { state: 'paused', message: 'Game paused' }
if (resume === 'resume') state = 'playing'
if (resume === 'quit') state = 'menu'
break
case 'victory':
yield { state: 'victory', message: 'You won!' }
state = 'menu'
break
case 'gameOver':
yield { state: 'gameOver', message: 'Game Over' }
state = 'menu'
break
}
}
}

惰性求值#

// 惰性求值的无限列表
class LazyList {
constructor(generator) {
this.generator = generator
}
*[Symbol.iterator]() {
yield* this.generator()
}
map(fn) {
const self = this
return new LazyList(function* () {
for (const item of self) {
yield fn(item)
}
})
}
filter(predicate) {
const self = this
return new LazyList(function* () {
for (const item of self) {
if (predicate(item)) yield item
}
})
}
take(n) {
const self = this
return new LazyList(function* () {
let count = 0
for (const item of self) {
if (count++ >= n) break
yield item
}
})
}
toArray() {
return [...this]
}
static from(iterable) {
return new LazyList(function* () {
yield* iterable
})
}
static range(start, end) {
return new LazyList(function* () {
for (let i = start; i <= end; i++) {
yield i
}
})
}
}
// 使用
LazyList.range(1, Infinity)
.filter((n) => n % 2 === 0)
.map((n) => n * n)
.take(5)
.toArray()
// [4, 16, 36, 64, 100]

小结#

应用场景说明
异步流程控制co 模式,async/await 的前身
协程多执行流协作
数据流处理惰性求值、管道转换
状态机复杂状态转换
并发控制任务队列、重试机制

Generator 是 JavaScript 中非常强大的特性,虽然 async/await 更常用,但理解 Generator 对于深入理解 JavaScript 异步编程至关重要。