环境信息:Node.js 20.x、@nestjs/bull 10.x、bull 4.x、Redis 7.x
为什么需要消息队列#
某些操作比较耗时,不适合在请求中同步执行,比如:
- 发送邮件/短信
- 生成报表
- 视频/图片处理
- 数据导入导出
使用消息队列可以将这些任务异步处理,提高接口响应速度。
安装配置#
Bull 是一个基于 Redis 的队列库:
pnpm add @nestjs/bull bull -Spnpm add @types/bull -D在 AppModule 中配置:
import { BullModule } from '@nestjs/bull'
@Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379, }, }), ],})export class AppModule {}创建队列#
定义一个邮件发送队列:
import { Module } from '@nestjs/common'import { BullModule } from '@nestjs/bull'import { EmailService } from './email.service'import { EmailProcessor } from './email.processor'
@Module({ imports: [ BullModule.registerQueue({ name: 'email', }), ], providers: [EmailService, EmailProcessor], exports: [EmailService],})export class EmailModule {}添加任务#
在 Service 中添加任务到队列:
import { Injectable } from '@nestjs/common'import { InjectQueue } from '@nestjs/bull'import { Queue } from 'bull'
@Injectable()export class EmailService { constructor(@InjectQueue('email') private emailQueue: Queue) {}
// 添加立即执行的任务 async sendWelcomeEmail(userId: number, email: string) { await this.emailQueue.add('welcome', { userId, email, }) }
// 添加延迟执行的任务 async sendReminder(userId: number, email: string) { await this.emailQueue.add( 'reminder', { userId, email }, { delay: 24 * 60 * 60 * 1000, // 24 小时后执行 } ) }
// 添加定时执行的任务 async scheduleReport(userId: number, email: string) { await this.emailQueue.add( 'report', { userId, email }, { repeat: { cron: '0 9 * * 1', // 每周一早上 9 点 }, } ) }}处理任务#
创建 Processor 处理队列中的任务:
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed,} from '@nestjs/bull'import { Job } from 'bull'import { Logger } from '@nestjs/common'
@Processor('email')export class EmailProcessor { private readonly logger = new Logger(EmailProcessor.name)
@Process('welcome') async handleWelcome(job: Job) { const { userId, email } = job.data this.logger.log(`发送欢迎邮件给用户 ${userId}`)
// 模拟发送邮件 await this.sendEmail(email, '欢迎注册', '感谢您的注册!')
return { success: true } }
@Process('reminder') async handleReminder(job: Job) { const { userId, email } = job.data this.logger.log(`发送提醒邮件给用户 ${userId}`)
await this.sendEmail(email, '温馨提醒', '记得完成您的任务!')
return { success: true } }
private async sendEmail(to: string, subject: string, content: string) { // 实际的邮件发送逻辑 await new Promise((resolve) => setTimeout(resolve, 1000)) this.logger.log(`邮件已发送: ${to}`) }
@OnQueueActive() onActive(job: Job) { this.logger.log(`任务开始处理: ${job.id}`) }
@OnQueueCompleted() onCompleted(job: Job, result: any) { this.logger.log(`任务完成: ${job.id}, 结果: ${JSON.stringify(result)}`) }
@OnQueueFailed() onFailed(job: Job, error: Error) { this.logger.error(`任务失败: ${job.id}, 错误: ${error.message}`) }}任务配置#
添加任务时可以配置多种选项:
await this.emailQueue.add( 'send', { email, content }, { delay: 5000, // 延迟 5 秒执行 attempts: 3, // 失败重试 3 次 backoff: { type: 'exponential', delay: 1000, // 重试间隔指数增长 }, removeOnComplete: true, // 完成后删除任务 removeOnFail: false, // 失败后保留任务 priority: 1, // 优先级,数字越小优先级越高 })任务进度#
对于耗时较长的任务,可以报告进度:
@Process('video-transcode')async handleTranscode(job: Job) { const { videoId } = job.data
for (let i = 0; i <= 100; i += 10) { await this.processChunk() await job.progress(i) // 更新进度 }
return { success: true }}前端可以轮询获取进度:
@Get('job/:id/progress')async getJobProgress(@Param('id') id: string) { const job = await this.emailQueue.getJob(id) return { progress: job.progress(), state: await job.getState(), }}并发控制#
限制同时处理的任务数量:
@Processor('email')export class EmailProcessor { // 限制并发为 5 @Process({ name: 'send', concurrency: 5 }) async handleSend(job: Job) { // 处理逻辑 }}也可以在队列级别配置:
BullModule.registerQueue({ name: 'email', limiter: { max: 100, // 每秒最多处理 100 个任务 duration: 1000, },})队列监控#
Bull 提供了 Dashboard 来监控队列状态:
pnpm add @bull-board/express @bull-board/api -Simport { createBullBoard } from '@bull-board/api'import { BullAdapter } from '@bull-board/api/bullAdapter'import { ExpressAdapter } from '@bull-board/express'
async function bootstrap() { const app = await NestFactory.create(AppModule)
const serverAdapter = new ExpressAdapter() serverAdapter.setBasePath('/admin/queues')
const emailQueue = app.get<Queue>('BullQueue_email')
createBullBoard({ queues: [new BullAdapter(emailQueue)], serverAdapter, })
app.use('/admin/queues', serverAdapter.getRouter())
await app.listen(3000)}访问 http://localhost:3000/admin/queues 可以查看队列状态、任务列表、失败任务等信息。