Skip to content

异步迭代

ES2018 引入了异步迭代协议,让我们能够以同步的方式遍历异步数据流。

异步迭代协议#

同步 vs 异步迭代器#

// 同步迭代器
const syncIterator = {
[Symbol.iterator]() {
let i = 0
return {
next() {
if (i < 3) return { value: i++, done: false }
return { done: true }
},
}
},
}
// 异步迭代器
const asyncIterator = {
[Symbol.asyncIterator]() {
let i = 0
return {
async next() {
await delay(100)
if (i < 3) return { value: i++, done: false }
return { done: true }
},
}
},
}
function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

手动消费异步迭代器#

async function consume() {
const iterator = asyncIterator[Symbol.asyncIterator]()
let result = await iterator.next()
while (!result.done) {
console.log(result.value)
result = await iterator.next()
}
}
consume() // 0, 1, 2(每隔 100ms)

for await…of#

基本用法#

const asyncIterable = {
[Symbol.asyncIterator]() {
let i = 0
return {
next() {
if (i < 3) {
return Promise.resolve({ value: i++, done: false })
}
return Promise.resolve({ done: true })
},
}
},
}
async function example() {
for await (const value of asyncIterable) {
console.log(value)
}
}
example() // 0, 1, 2

遍历 Promise 数组#

async function fetchAll() {
const promises = [
fetch('/api/1').then((r) => r.json()),
fetch('/api/2').then((r) => r.json()),
fetch('/api/3').then((r) => r.json()),
]
// 按顺序获取结果
for await (const result of promises) {
console.log(result)
}
}

🤔 注意:for await…of 遍历 Promise 数组时,会按数组顺序输出,而不是按完成顺序。

异步生成器#

基本语法#

async function* asyncGenerator() {
yield await Promise.resolve(1)
yield await Promise.resolve(2)
yield await Promise.resolve(3)
}
async function example() {
for await (const value of asyncGenerator()) {
console.log(value)
}
}
example() // 1, 2, 3

模拟数据流#

async function* createDataStream() {
let count = 0
while (count < 5) {
// 模拟异步获取数据
await delay(500)
yield { id: count++, timestamp: Date.now() }
}
}
async function consumeStream() {
for await (const data of createDataStream()) {
console.log('收到数据:', data)
}
console.log('数据流结束')
}
consumeStream()

分页数据获取#

async function* fetchPages(baseUrl) {
let page = 1
let hasMore = true
while (hasMore) {
const response = await fetch(`${baseUrl}?page=${page}`)
const data = await response.json()
yield data.items
hasMore = data.hasNextPage
page++
}
}
async function getAllItems() {
const allItems = []
for await (const items of fetchPages('/api/users')) {
allItems.push(...items)
console.log(`获取了 ${items.length} 条数据`)
}
return allItems
}

实际应用场景#

文件逐行读取(Node.js)#

import { createReadStream } from 'fs'
import { createInterface } from 'readline'
async function* readLines(filePath) {
const fileStream = createReadStream(filePath)
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity,
})
for await (const line of rl) {
yield line
}
}
async function processFile() {
let lineNumber = 0
for await (const line of readLines('./data.txt')) {
lineNumber++
console.log(`${lineNumber}: ${line}`)
}
}

WebSocket 消息流#

async function* createWebSocketStream(url) {
const ws = new WebSocket(url)
// 等待连接建立
await new Promise((resolve, reject) => {
ws.onopen = resolve
ws.onerror = reject
})
// 创建消息队列
const messages = []
let resolve
let done = false
ws.onmessage = (event) => {
messages.push(event.data)
if (resolve) {
resolve()
resolve = null
}
}
ws.onclose = () => {
done = true
if (resolve) resolve()
}
try {
while (!done || messages.length > 0) {
if (messages.length === 0) {
await new Promise((r) => {
resolve = r
})
}
if (messages.length > 0) {
yield messages.shift()
}
}
} finally {
ws.close()
}
}
async function handleMessages() {
for await (const message of createWebSocketStream('wss://example.com')) {
console.log('收到消息:', message)
}
}

SSE(Server-Sent Events)处理#

async function* createSSEStream(url) {
const response = await fetch(url)
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() // 保留未完成的行
for (const line of lines) {
if (line.startsWith('data: ')) {
yield JSON.parse(line.slice(6))
}
}
}
}
async function consumeSSE() {
for await (const event of createSSEStream('/api/events')) {
console.log('SSE 事件:', event)
}
}

批量处理带速率限制#

async function* batchProcess(items, batchSize, delayMs) {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize)
// 并行处理当前批次
const results = await Promise.all(batch.map((item) => processItem(item)))
yield* results
// 批次间延迟
if (i + batchSize < items.length) {
await delay(delayMs)
}
}
}
async function processBatches() {
const items = Array.from({ length: 100 }, (_, i) => i)
for await (const result of batchProcess(items, 10, 1000)) {
console.log('处理结果:', result)
}
}

错误处理#

try/catch 处理#

async function* failingGenerator() {
yield 1
yield 2
throw new Error('生成器内部错误')
yield 3 // 不会执行
}
async function handleErrors() {
try {
for await (const value of failingGenerator()) {
console.log(value)
}
} catch (error) {
console.error('捕获到错误:', error.message)
}
}
handleErrors()
// 1
// 2
// 捕获到错误: 生成器内部错误

在生成器内部处理错误#

async function* resilientGenerator(urls) {
for (const url of urls) {
try {
const response = await fetch(url)
yield await response.json()
} catch (error) {
yield { error: error.message, url }
}
}
}
async function fetchAll() {
for await (const result of resilientGenerator(urls)) {
if (result.error) {
console.error(`获取 ${result.url} 失败:`, result.error)
} else {
console.log('数据:', result)
}
}
}

return 和 throw 方法#

async function* generator() {
try {
yield 1
yield 2
yield 3
} finally {
console.log('清理资源')
}
}
async function example() {
const gen = generator()
console.log(await gen.next()) // { value: 1, done: false }
console.log(await gen.return('提前结束'))
// 清理资源
// { value: '提前结束', done: true }
}

并发控制#

并发异步迭代#

async function* concurrentMap(iterable, fn, concurrency) {
const iterator = iterable[Symbol.asyncIterator]
? iterable[Symbol.asyncIterator]()
: iterable[Symbol.iterator]()
const executing = new Set()
const results = []
let index = 0
async function processNext() {
const { value, done } = await iterator.next()
if (done) return null
const currentIndex = index++
const promise = fn(value, currentIndex).then((result) => ({
index: currentIndex,
result,
}))
executing.add(promise)
promise.finally(() => executing.delete(promise))
return promise
}
// 初始填充
for (let i = 0; i < concurrency; i++) {
const promise = processNext()
if (promise) executing.add(promise)
}
while (executing.size > 0) {
const { index, result } = await Promise.race(executing)
results[index] = result
yield result
const nextPromise = await processNext()
if (nextPromise) executing.add(nextPromise)
}
}
// 使用
async function example() {
const items = [1, 2, 3, 4, 5]
for await (const result of concurrentMap(
items,
async (item) => {
await delay(Math.random() * 1000)
return item * 2
},
3
)) {
console.log(result)
}
}

限速迭代#

async function* throttle(asyncIterable, intervalMs) {
let lastTime = 0
for await (const item of asyncIterable) {
const now = Date.now()
const elapsed = now - lastTime
if (elapsed < intervalMs) {
await delay(intervalMs - elapsed)
}
lastTime = Date.now()
yield item
}
}
async function example() {
const fastStream = createFastStream()
// 每 100ms 最多输出一个值
for await (const value of throttle(fastStream, 100)) {
console.log(value)
}
}

工具函数#

异步迭代器转数组#

async function toArray(asyncIterable) {
const result = []
for await (const item of asyncIterable) {
result.push(item)
}
return result
}
const items = await toArray(asyncGenerator())

map/filter 组合#

async function* asyncMap(iterable, fn) {
for await (const item of iterable) {
yield await fn(item)
}
}
async function* asyncFilter(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) {
yield item
}
}
}
// 链式使用
const result = asyncFilter(asyncMap(dataStream, transform), validate)
for await (const item of result) {
console.log(item)
}

take/skip#

async function* take(iterable, n) {
let count = 0
for await (const item of iterable) {
if (count++ >= n) break
yield item
}
}
async function* skip(iterable, n) {
let count = 0
for await (const item of iterable) {
if (count++ < n) continue
yield item
}
}
// 跳过前 5 个,取接下来的 10 个
for await (const item of take(skip(dataStream, 5), 10)) {
console.log(item)
}

小结#

特性说明
Symbol.asyncIterator定义异步迭代器接口
async function*异步生成器函数
for await…of遍历异步可迭代对象
yield异步生成器中暂停并产出值

异步迭代是处理流式数据、分页数据、WebSocket 消息等异步序列的强大工具。