ES2018 引入了异步迭代协议,让我们能够以同步的方式遍历异步数据流。
异步迭代协议#
同步 vs 异步迭代器#
// 同步迭代器const syncIterator = { [Symbol.iterator]() { let i = 0 return { next() { if (i < 3) return { value: i++, done: false } return { done: true } }, } },}
// 异步迭代器const asyncIterator = { [Symbol.asyncIterator]() { let i = 0 return { async next() { await delay(100) if (i < 3) return { value: i++, done: false } return { done: true } }, } },}
function delay(ms) { return new Promise((resolve) => setTimeout(resolve, ms))}手动消费异步迭代器#
async function consume() { const iterator = asyncIterator[Symbol.asyncIterator]()
let result = await iterator.next() while (!result.done) { console.log(result.value) result = await iterator.next() }}
consume() // 0, 1, 2(每隔 100ms)for await…of#
基本用法#
const asyncIterable = { [Symbol.asyncIterator]() { let i = 0 return { next() { if (i < 3) { return Promise.resolve({ value: i++, done: false }) } return Promise.resolve({ done: true }) }, } },}
async function example() { for await (const value of asyncIterable) { console.log(value) }}
example() // 0, 1, 2遍历 Promise 数组#
async function fetchAll() { const promises = [ fetch('/api/1').then((r) => r.json()), fetch('/api/2').then((r) => r.json()), fetch('/api/3').then((r) => r.json()), ]
// 按顺序获取结果 for await (const result of promises) { console.log(result) }}🤔 注意:for await…of 遍历 Promise 数组时,会按数组顺序输出,而不是按完成顺序。
异步生成器#
基本语法#
async function* asyncGenerator() { yield await Promise.resolve(1) yield await Promise.resolve(2) yield await Promise.resolve(3)}
async function example() { for await (const value of asyncGenerator()) { console.log(value) }}
example() // 1, 2, 3模拟数据流#
async function* createDataStream() { let count = 0 while (count < 5) { // 模拟异步获取数据 await delay(500) yield { id: count++, timestamp: Date.now() } }}
async function consumeStream() { for await (const data of createDataStream()) { console.log('收到数据:', data) } console.log('数据流结束')}
consumeStream()分页数据获取#
async function* fetchPages(baseUrl) { let page = 1 let hasMore = true
while (hasMore) { const response = await fetch(`${baseUrl}?page=${page}`) const data = await response.json()
yield data.items
hasMore = data.hasNextPage page++ }}
async function getAllItems() { const allItems = []
for await (const items of fetchPages('/api/users')) { allItems.push(...items) console.log(`获取了 ${items.length} 条数据`) }
return allItems}实际应用场景#
文件逐行读取(Node.js)#
import { createReadStream } from 'fs'import { createInterface } from 'readline'
async function* readLines(filePath) { const fileStream = createReadStream(filePath) const rl = createInterface({ input: fileStream, crlfDelay: Infinity, })
for await (const line of rl) { yield line }}
async function processFile() { let lineNumber = 0 for await (const line of readLines('./data.txt')) { lineNumber++ console.log(`${lineNumber}: ${line}`) }}WebSocket 消息流#
async function* createWebSocketStream(url) { const ws = new WebSocket(url)
// 等待连接建立 await new Promise((resolve, reject) => { ws.onopen = resolve ws.onerror = reject })
// 创建消息队列 const messages = [] let resolve let done = false
ws.onmessage = (event) => { messages.push(event.data) if (resolve) { resolve() resolve = null } }
ws.onclose = () => { done = true if (resolve) resolve() }
try { while (!done || messages.length > 0) { if (messages.length === 0) { await new Promise((r) => { resolve = r }) } if (messages.length > 0) { yield messages.shift() } } } finally { ws.close() }}
async function handleMessages() { for await (const message of createWebSocketStream('wss://example.com')) { console.log('收到消息:', message) }}SSE(Server-Sent Events)处理#
async function* createSSEStream(url) { const response = await fetch(url) const reader = response.body.getReader() const decoder = new TextDecoder()
let buffer = ''
while (true) { const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n') buffer = lines.pop() // 保留未完成的行
for (const line of lines) { if (line.startsWith('data: ')) { yield JSON.parse(line.slice(6)) } } }}
async function consumeSSE() { for await (const event of createSSEStream('/api/events')) { console.log('SSE 事件:', event) }}批量处理带速率限制#
async function* batchProcess(items, batchSize, delayMs) { for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize)
// 并行处理当前批次 const results = await Promise.all(batch.map((item) => processItem(item)))
yield* results
// 批次间延迟 if (i + batchSize < items.length) { await delay(delayMs) } }}
async function processBatches() { const items = Array.from({ length: 100 }, (_, i) => i)
for await (const result of batchProcess(items, 10, 1000)) { console.log('处理结果:', result) }}错误处理#
try/catch 处理#
async function* failingGenerator() { yield 1 yield 2 throw new Error('生成器内部错误') yield 3 // 不会执行}
async function handleErrors() { try { for await (const value of failingGenerator()) { console.log(value) } } catch (error) { console.error('捕获到错误:', error.message) }}
handleErrors()// 1// 2// 捕获到错误: 生成器内部错误在生成器内部处理错误#
async function* resilientGenerator(urls) { for (const url of urls) { try { const response = await fetch(url) yield await response.json() } catch (error) { yield { error: error.message, url } } }}
async function fetchAll() { for await (const result of resilientGenerator(urls)) { if (result.error) { console.error(`获取 ${result.url} 失败:`, result.error) } else { console.log('数据:', result) } }}return 和 throw 方法#
async function* generator() { try { yield 1 yield 2 yield 3 } finally { console.log('清理资源') }}
async function example() { const gen = generator()
console.log(await gen.next()) // { value: 1, done: false } console.log(await gen.return('提前结束')) // 清理资源 // { value: '提前结束', done: true }}并发控制#
并发异步迭代#
async function* concurrentMap(iterable, fn, concurrency) { const iterator = iterable[Symbol.asyncIterator] ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]()
const executing = new Set() const results = [] let index = 0
async function processNext() { const { value, done } = await iterator.next() if (done) return null
const currentIndex = index++ const promise = fn(value, currentIndex).then((result) => ({ index: currentIndex, result, }))
executing.add(promise) promise.finally(() => executing.delete(promise))
return promise }
// 初始填充 for (let i = 0; i < concurrency; i++) { const promise = processNext() if (promise) executing.add(promise) }
while (executing.size > 0) { const { index, result } = await Promise.race(executing) results[index] = result yield result
const nextPromise = await processNext() if (nextPromise) executing.add(nextPromise) }}
// 使用async function example() { const items = [1, 2, 3, 4, 5]
for await (const result of concurrentMap( items, async (item) => { await delay(Math.random() * 1000) return item * 2 }, 3 )) { console.log(result) }}限速迭代#
async function* throttle(asyncIterable, intervalMs) { let lastTime = 0
for await (const item of asyncIterable) { const now = Date.now() const elapsed = now - lastTime
if (elapsed < intervalMs) { await delay(intervalMs - elapsed) }
lastTime = Date.now() yield item }}
async function example() { const fastStream = createFastStream()
// 每 100ms 最多输出一个值 for await (const value of throttle(fastStream, 100)) { console.log(value) }}工具函数#
异步迭代器转数组#
async function toArray(asyncIterable) { const result = [] for await (const item of asyncIterable) { result.push(item) } return result}
const items = await toArray(asyncGenerator())map/filter 组合#
async function* asyncMap(iterable, fn) { for await (const item of iterable) { yield await fn(item) }}
async function* asyncFilter(iterable, predicate) { for await (const item of iterable) { if (await predicate(item)) { yield item } }}
// 链式使用const result = asyncFilter(asyncMap(dataStream, transform), validate)
for await (const item of result) { console.log(item)}take/skip#
async function* take(iterable, n) { let count = 0 for await (const item of iterable) { if (count++ >= n) break yield item }}
async function* skip(iterable, n) { let count = 0 for await (const item of iterable) { if (count++ < n) continue yield item }}
// 跳过前 5 个,取接下来的 10 个for await (const item of take(skip(dataStream, 5), 10)) { console.log(item)}小结#
| 特性 | 说明 |
|---|---|
| Symbol.asyncIterator | 定义异步迭代器接口 |
| async function* | 异步生成器函数 |
| for await…of | 遍历异步可迭代对象 |
| yield | 异步生成器中暂停并产出值 |
异步迭代是处理流式数据、分页数据、WebSocket 消息等异步序列的强大工具。