Bull库

Bull - Redis 驱动的 Node.js 任务队列库

#resource / javascript #tech / dev / backend #type / howto #status / growing

Bull 任务队列库

[!info] related notes

Bull 是一个基于 Redis 的 Node.js 高性能任务队列库,用于处理后台任务、分布式作业和消息队列。

基础概念

核心特点

  • 基于 Redis:利用 Redis 实现高性能、持久化的任务队列
  • 分布式处理:支持多进程/多服务器并行处理任务
  • 任务调度:支持延迟任务、重复任务、优先级任务
  • 可靠性:内置重试、失败处理、超时机制
  • 监控友好:提供丰富的事件和 Dashboard 工具

应用场景

场景说明
邮件发送批量邮件、延迟发送
文件处理图片压缩、视频转码
数据同步数据库同步、缓存更新
定时任务Cron 式定时执行
第三方 API限速调用、重试机制

使用指南

安装

npm install bull
npm install @types/bull -D  # TypeScript 支持

确保有可用的 Redis 实例:

docker run -d --name redis -p 6379:6379 redis:alpine

基础用法

1. 创建队列

import Bull from 'bull';

// 创建队列
const emailQueue = new Bull('email-queue', {
  redis: {
    host: 'localhost',
    port: 6379,
  },
});

// 添加任务
await emailQueue.add({
  to: 'user@example.com',
  subject: 'Welcome!',
  body: 'Hello, welcome to our service.',
});

2. 处理任务

// 定义任务处理器
emailQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  console.log('Data:', job.data);
  
  await sendEmail(job.data);
  
  return { sent: true };
});

3. 延迟任务

// 5分钟后执行
await queue.add(data, { delay: 5 * 60 * 1000 });

4. 重复任务(Cron)

// 每天早上9点执行
await queue.add(data, {
  repeat: {
    cron: '0 9 * * *',
  },
});

5. 优先级任务

// 优先级:数字越小优先级越高
await queue.add(highPriorityData, { priority: 1 });
await queue.add(normalData, { priority: 5 });
await queue.add(lowPriorityData, { priority: 10 });

事件监听

queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

queue.on('failed', (job, error) => {
  console.log(`Job ${job.id} failed:`, error.message);
});

queue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} is ${progress}% complete`);
});

任务进度报告

queue.process(async (job) => {
  for (let i = 0; i <= 100; i += 10) {
    await someWork();
    job.progress(i);  // 报告进度
  }
  return 'Done';
});

高级配置

重试策略

await queue.add(data, {
  attempts: 3,           // 最多重试3次
  backoff: {
    type: 'exponential', // 指数退避
    delay: 1000,         // 初始延迟1秒
  },
});

并发控制

// 同时处理最多5个任务
queue.process(5, async (job) => {
  // ...
});

任务超时

await queue.add(data, {
  timeout: 30000,  // 30秒超时
});

实战经验

最佳实践

  1. 幂等设计:任务处理器应设计为幂等,因为可能重复执行
  2. 错误处理:始终捕获异常,让任务正确失败而非崩溃
  3. 监控告警:使用 Bull Dashboard 或 Arena 监控队列健康
  4. 清理策略:定期清理已完成任务,避免 Redis 内存膨胀

监控工具

# Bull Board(推荐)
npm install @bull-board/express @bull-board/api

# Arena
npm install bull-arena

参考资料

[!tip] 提示 Bull 已有继任者 BullMQ,新项目可考虑直接使用 BullMQ,它提供更好的 TypeScript 支持和更多功能。

创建于 2025/8/10 更新于 2026/5/27