Skip to content

Bull队列与异步任务

环境信息:Node.js 20.x、@nestjs/bull 10.x、bull 4.x、Redis 7.x

为什么需要消息队列#

某些操作比较耗时,不适合在请求中同步执行,比如:

使用消息队列可以将这些任务异步处理,提高接口响应速度。

安装配置#

Bull 是一个基于 Redis 的队列库:

Terminal window
pnpm add @nestjs/bull bull -S
pnpm add @types/bull -D

在 AppModule 中配置:

import { BullModule } from '@nestjs/bull'
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}

创建队列#

定义一个邮件发送队列:

email.module.ts
import { Module } from '@nestjs/common'
import { BullModule } from '@nestjs/bull'
import { EmailService } from './email.service'
import { EmailProcessor } from './email.processor'
@Module({
imports: [
BullModule.registerQueue({
name: 'email',
}),
],
providers: [EmailService, EmailProcessor],
exports: [EmailService],
})
export class EmailModule {}

添加任务#

在 Service 中添加任务到队列:

email.service.ts
import { Injectable } from '@nestjs/common'
import { InjectQueue } from '@nestjs/bull'
import { Queue } from 'bull'
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
// 添加立即执行的任务
async sendWelcomeEmail(userId: number, email: string) {
await this.emailQueue.add('welcome', {
userId,
email,
})
}
// 添加延迟执行的任务
async sendReminder(userId: number, email: string) {
await this.emailQueue.add(
'reminder',
{ userId, email },
{
delay: 24 * 60 * 60 * 1000, // 24 小时后执行
}
)
}
// 添加定时执行的任务
async scheduleReport(userId: number, email: string) {
await this.emailQueue.add(
'report',
{ userId, email },
{
repeat: {
cron: '0 9 * * 1', // 每周一早上 9 点
},
}
)
}
}

处理任务#

创建 Processor 处理队列中的任务:

email.processor.ts
import {
Processor,
Process,
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
} from '@nestjs/bull'
import { Job } from 'bull'
import { Logger } from '@nestjs/common'
@Processor('email')
export class EmailProcessor {
private readonly logger = new Logger(EmailProcessor.name)
@Process('welcome')
async handleWelcome(job: Job) {
const { userId, email } = job.data
this.logger.log(`发送欢迎邮件给用户 ${userId}`)
// 模拟发送邮件
await this.sendEmail(email, '欢迎注册', '感谢您的注册!')
return { success: true }
}
@Process('reminder')
async handleReminder(job: Job) {
const { userId, email } = job.data
this.logger.log(`发送提醒邮件给用户 ${userId}`)
await this.sendEmail(email, '温馨提醒', '记得完成您的任务!')
return { success: true }
}
private async sendEmail(to: string, subject: string, content: string) {
// 实际的邮件发送逻辑
await new Promise((resolve) => setTimeout(resolve, 1000))
this.logger.log(`邮件已发送: ${to}`)
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(`任务开始处理: ${job.id}`)
}
@OnQueueCompleted()
onCompleted(job: Job, result: any) {
this.logger.log(`任务完成: ${job.id}, 结果: ${JSON.stringify(result)}`)
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
this.logger.error(`任务失败: ${job.id}, 错误: ${error.message}`)
}
}

任务配置#

添加任务时可以配置多种选项:

await this.emailQueue.add(
'send',
{ email, content },
{
delay: 5000, // 延迟 5 秒执行
attempts: 3, // 失败重试 3 次
backoff: {
type: 'exponential',
delay: 1000, // 重试间隔指数增长
},
removeOnComplete: true, // 完成后删除任务
removeOnFail: false, // 失败后保留任务
priority: 1, // 优先级,数字越小优先级越高
}
)

任务进度#

对于耗时较长的任务,可以报告进度:

@Process('video-transcode')
async handleTranscode(job: Job) {
const { videoId } = job.data
for (let i = 0; i <= 100; i += 10) {
await this.processChunk()
await job.progress(i) // 更新进度
}
return { success: true }
}

前端可以轮询获取进度:

@Get('job/:id/progress')
async getJobProgress(@Param('id') id: string) {
const job = await this.emailQueue.getJob(id)
return {
progress: job.progress(),
state: await job.getState(),
}
}

并发控制#

限制同时处理的任务数量:

@Processor('email')
export class EmailProcessor {
// 限制并发为 5
@Process({ name: 'send', concurrency: 5 })
async handleSend(job: Job) {
// 处理逻辑
}
}

也可以在队列级别配置:

BullModule.registerQueue({
name: 'email',
limiter: {
max: 100, // 每秒最多处理 100 个任务
duration: 1000,
},
})

队列监控#

Bull 提供了 Dashboard 来监控队列状态:

Terminal window
pnpm add @bull-board/express @bull-board/api -S
import { createBullBoard } from '@bull-board/api'
import { BullAdapter } from '@bull-board/api/bullAdapter'
import { ExpressAdapter } from '@bull-board/express'
async function bootstrap() {
const app = await NestFactory.create(AppModule)
const serverAdapter = new ExpressAdapter()
serverAdapter.setBasePath('/admin/queues')
const emailQueue = app.get<Queue>('BullQueue_email')
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
})
app.use('/admin/queues', serverAdapter.getRouter())
await app.listen(3000)
}

访问 http://localhost:3000/admin/queues 可以查看队列状态、任务列表、失败任务等信息。