在掌握 Generator 基础后,来看它在异步编程和更复杂场景中的应用。
Generator 与异步#
Generator 的暂停-恢复特性非常适合处理异步操作。
手动执行异步 Generator#
function* fetchUser() { const response = yield fetch('/api/user') const user = yield response.json() console.log(user)}
// 手动执行function run(generator) { const gen = generator()
function step(result) { if (result.done) return Promise.resolve(result.value)
return Promise.resolve(result.value) .then((data) => step(gen.next(data))) .catch((err) => step(gen.throw(err))) }
return step(gen.next())}
run(fetchUser)自动执行器(co 模式)#
function co(generator) { return new Promise((resolve, reject) => { const gen = generator()
function step(nextValue) { let result try { result = gen.next(nextValue) } catch (e) { return reject(e) }
if (result.done) { return resolve(result.value) }
Promise.resolve(result.value) .then(step) .catch((err) => { try { result = gen.throw(err) if (result.done) { resolve(result.value) } else { Promise.resolve(result.value).then(step).catch(reject) } } catch (e) { reject(e) } }) }
step() })}
// 使用co(function* () { const user = yield fetch('/api/user').then((r) => r.json()) const posts = yield fetch(`/api/posts?userId=${user.id}`).then((r) => r.json() ) console.log(posts)})对比 async/await#
Generator + co 是 async/await 的前身:
// Generatorco(function* () { try { const a = yield Promise.resolve(1) const b = yield Promise.resolve(2) return a + b } catch (e) { console.error(e) }})
// async/await;(async () => { try { const a = await Promise.resolve(1) const b = await Promise.resolve(2) return a + b } catch (e) { console.error(e) }})()协程概念#
Generator 可以实现协程(coroutine),多个执行流程之间协作:
// 生产者function* producer(consumer) { console.log('生产者启动') consumer.next() // 启动消费者
let n = 0 while (n < 5) { n++ console.log(`生产: ${n}`) consumer.next(n) }
consumer.return() // 结束消费者}
// 消费者function* consumer() { console.log('消费者启动') while (true) { const n = yield if (n === undefined) break console.log(`消费: ${n}`) } console.log('消费者结束')}
// 运行const c = consumer()const p = producer(c)p.next()// 生产者启动// 消费者启动// 生产: 1// 消费: 1// 生产: 2// 消费: 2// ...// 消费者结束双向通信#
function* ping() { while (true) { const msg = yield 'ping' console.log('收到:', msg) }}
function* pong(pingGen) { let msg = pingGen.next().value while (true) { console.log('收到:', msg) msg = pingGen.next('pong').value yield }}
const pingGen = ping()const pongGen = pong(pingGen)
pongGen.next() // 收到: pingpongGen.next() // 收到: pong, 收到: ping流处理#
数据转换管道#
function* map(iterable, fn) { for (const item of iterable) { yield fn(item) }}
function* filter(iterable, predicate) { for (const item of iterable) { if (predicate(item)) { yield item } }}
function* take(iterable, n) { let count = 0 for (const item of iterable) { if (count++ >= n) break yield item }}
function* skip(iterable, n) { let count = 0 for (const item of iterable) { if (count++ < n) continue yield item }}
// 链式处理function pipe(source, ...transforms) { return transforms.reduce((acc, transform) => transform(acc), source)}
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
const result = [ ...pipe( numbers, (x) => filter(x, (n) => n % 2 === 0), (x) => map(x, (n) => n * 2), (x) => take(x, 3) ),]// [4, 8, 12]分块处理#
function* chunk(iterable, size) { let batch = [] for (const item of iterable) { batch.push(item) if (batch.length === size) { yield batch batch = [] } } if (batch.length > 0) { yield batch }}
;[...chunk([1, 2, 3, 4, 5, 6, 7], 3)]// [[1, 2, 3], [4, 5, 6], [7]]滑动窗口#
function* slidingWindow(iterable, size) { const buffer = [] for (const item of iterable) { buffer.push(item) if (buffer.length === size) { yield [...buffer] buffer.shift() } }}
;[...slidingWindow([1, 2, 3, 4, 5], 3)]// [[1, 2, 3], [2, 3, 4], [3, 4, 5]]并发控制#
任务队列#
function* taskQueue(tasks, concurrency) { const executing = new Set()
for (const task of tasks) { const promise = task().then((result) => { executing.delete(promise) return result }) executing.add(promise)
if (executing.size >= concurrency) { yield Promise.race(executing) } }
// 等待剩余任务 while (executing.size > 0) { yield Promise.race(executing) }}
async function runTasks(tasks, concurrency) { const results = [] for await (const result of taskQueue(tasks, concurrency)) { results.push(result) } return results}
// 使用const tasks = Array.from( { length: 10 }, (_, i) => () => new Promise((resolve) => setTimeout(() => resolve(i), Math.random() * 1000)))
runTasks(tasks, 3).then(console.log)重试机制#
function* retry(fn, maxAttempts, delay) { for (let attempt = 1; attempt <= maxAttempts; attempt++) { try { const result = yield fn() return result } catch (error) { if (attempt === maxAttempts) { throw error } console.log(`尝试 ${attempt} 失败,${delay}ms 后重试...`) yield new Promise((resolve) => setTimeout(resolve, delay)) } }}
async function runWithRetry(fn, maxAttempts = 3, delay = 1000) { const gen = retry(fn, maxAttempts, delay) let result = gen.next()
while (!result.done) { try { const value = await result.value result = gen.next(value) } catch (error) { result = gen.throw(error) } }
return result.value}状态机进阶#
HTTP 请求状态机#
function* httpRequestStateMachine() { while (true) { const action = yield 'idle'
if (action === 'fetch') { yield 'loading'
try { const response = yield 'fetching' yield { state: 'success', data: response } } catch (error) { yield { state: 'error', error } } } }}
class RequestManager { constructor() { this.machine = httpRequestStateMachine() this.state = this.machine.next().value }
async fetch(url) { this.state = this.machine.next('fetch').value // loading
try { const response = await fetch(url) const data = await response.json() this.state = this.machine.next(data).value // success return this.state.data } catch (error) { this.state = this.machine.throw(error).value // error throw error } }}游戏循环#
function* gameLoop() { let state = 'menu'
while (true) { switch (state) { case 'menu': const action = yield { state: 'menu', options: ['start', 'quit'] } if (action === 'start') state = 'playing' if (action === 'quit') return 'game ended' break
case 'playing': const event = yield { state: 'playing', message: 'Game running...' } if (event === 'win') state = 'victory' if (event === 'lose') state = 'gameOver' if (event === 'pause') state = 'paused' break
case 'paused': const resume = yield { state: 'paused', message: 'Game paused' } if (resume === 'resume') state = 'playing' if (resume === 'quit') state = 'menu' break
case 'victory': yield { state: 'victory', message: 'You won!' } state = 'menu' break
case 'gameOver': yield { state: 'gameOver', message: 'Game Over' } state = 'menu' break } }}惰性求值#
// 惰性求值的无限列表class LazyList { constructor(generator) { this.generator = generator }
*[Symbol.iterator]() { yield* this.generator() }
map(fn) { const self = this return new LazyList(function* () { for (const item of self) { yield fn(item) } }) }
filter(predicate) { const self = this return new LazyList(function* () { for (const item of self) { if (predicate(item)) yield item } }) }
take(n) { const self = this return new LazyList(function* () { let count = 0 for (const item of self) { if (count++ >= n) break yield item } }) }
toArray() { return [...this] }
static from(iterable) { return new LazyList(function* () { yield* iterable }) }
static range(start, end) { return new LazyList(function* () { for (let i = start; i <= end; i++) { yield i } }) }}
// 使用LazyList.range(1, Infinity) .filter((n) => n % 2 === 0) .map((n) => n * n) .take(5) .toArray()// [4, 16, 36, 64, 100]小结#
| 应用场景 | 说明 |
|---|---|
| 异步流程控制 | co 模式,async/await 的前身 |
| 协程 | 多执行流协作 |
| 数据流处理 | 惰性求值、管道转换 |
| 状态机 | 复杂状态转换 |
| 并发控制 | 任务队列、重试机制 |
Generator 是 JavaScript 中非常强大的特性,虽然 async/await 更常用,但理解 Generator 对于深入理解 JavaScript 异步编程至关重要。