在之前使用 nestjs 做一个项目时用到了队列的功能。官方推荐的包是 @nestjs/bullmq。用下来还是比较方便的,我们今天一起来看看它底层的 BullMQ 这个包的一些内容。
BullMQ
bullmq.io

在使用之前,我们要明确一些事情!
1. 为什么要有消息队列?
这里照搬 BullMQ 官网的说法:
- 解耦组件 消息队列是解耦应用程序组件的绝佳方式。它们也是通过将负载分配到多个工作进程来扩展应用程序的绝佳方式。
- 提高可靠性 通过向与其他服务或组件通信的作业添加重试和延迟来提高可靠性。
- 将任务拆分为更小的部分 将一个大的任务分解成更小的、独立的任务,可以更容易地扩展你的应用程序,并且在其中一个任务失败时也能提高应用程序的可靠性。
- 卸载长时间运行的任务 在后台运行长时间运行的任务,让用户在任务于工作进程池中运行时继续使用您的应用程序。
- 遵守现有的 API 通过在应用程序调用外部 API 之间添加队列并启用速率限制,来限制应用程序调用外部 API 的速度。
- 安排作业 作业可以延迟执行,以便在特定时间点运行,或者您可以按给定的时间间隔运行作业。
2. 在什么时候使用
- 如果你有一些特别耗时的任务,而且用户不需要立即知道处理结果(文件处理——音视频压缩处理等)
- 用户注册后需要发送短信、邮件等
- 缓存突发流量,避免系统过载
- 日志收集等操作
接下来我们看一下 BullMQ 的简单使用。
关联包安装
- bullmq
$pnpm install bullmq- redis 这里选择 ioredis
$pnpm install ioredis基础语法
添加任务
import { Queue } from 'bullmq';
const myQueue = new Queue('foo');
async function addJobs() {
await myQueue.add('myJobName', { foo: 'bar' });
await myQueue.add('myJobName', { qux: 'baz' }, { delay: 5000 }); // 延迟作业,该任务将至少等待 5 秒钟才会进行处理
}
await addJobs();
消费
import { Worker } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis({ maxRetriesPerRequest: null });
const worker = new Worker(
'foo',
async (job) => {
// 先打印 { foo: 'bar'}
// 然后打印 { qux: 'baz' }
console.log(job.data);
},
{ connection },
);
监听任务状态
// 成功
worker.on('completed', (job) => {
console.log(`${job.id} has completed!`);
});
// 失败
worker.on('failed', (job, err) => {
console.log(`${job.id} has failed with ${err.message}`);
});
自动删除已完成的任务
await myQueue.add(
'test',
{ foo: 'bar' },
{ removeOnComplete: true, removeOnFail: true }, // 成功和失败的任务都删除
);
添加任务是异步的,我们可以使用 addBulk 进行批量添加
import { Queue } from 'bullmq';
const queue = new Queue('paint');
const name = 'jobName';
const jobs = await queue.addBulk([
{ name, data: { paint: 'car' } },
{ name, data: { paint: 'house' } },
{ name, data: { paint: 'boat' } },
]);
速率限制
我们可以配置在一段时间内处理多少个作业
全局
import { Queue } from 'bullmq';
// 1 秒处理一个
await queue.setGlobalRateLimit(1, 1000);
单个任务
import { QueueScheduler, Worker } from 'bullmq';
const worker = new Worker('painter',
async job => paintCar(job),
{
limiter: {
max: 10,
duration: 1000,
},
});
在失败时重试
什么情况下 BullMQ 认为是失败了
- Worker 中抛出异常
- Worker 陷入停滞
如何配置失败时重试?
固定重试3次,每次延迟3秒
import { Queue } from 'bullmq';
const myQueue = new Queue('foo');
await queue.add(
'test-retry',
{ foo: 'bar' },
{
attempts: 3,
backoff: {
type: 'fixed',
delay: 1000,
},
},
);
可以配置 jitter 来实现随机抖动。延迟时间会在 1000ms ~ 1000 * 0.5 ms 之间随机
import { Queue } from 'bullmq';
const myQueue = new Queue('foo');
await queue.add(
'test-retry',
{ foo: 'bar' },
{
attempts: 8,
backoff: {
type: 'fixed',
delay: 1000,
jitter: 0.5,
},
},
);
扩展
使用 bull-board 监控和管理
它可以结合很多 node 框架使用。比如有常见的 express, fastify, koa, nestjs, hono, h3, elysia 等
我这几天看 hono 有点多,这里使用 hono 扩展。
pnpm install @bull-board/hono
import { createBullBoard } from '@bull-board/api'
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'
import { HonoAdapter } from '@bull-board/hono'
import { serveStatic } from '@hono/node-server/serve-static'
import { emailQueue } from '@/queues/email.queue'
const serverAdapter = new HonoAdapter(serveStatic)
serverAdapter.setBasePath('/admin/queues')
createBullBoard({ queues: [new BullMQAdapter(emailQueue)], serverAdapter })
export const queueUI = serverAdapter.registerPlugin()
import { createApp } from '@/lib/create-app'
import { queueUI } from '@/routes/admin/queue'
const app = createApp()
app.route('/admin/queues', queueUI)
export { app }
