agenda、bee-queue、bull、bullmq 和 kue 都是 Node.js 生态中用于处理后台作业和定时任务的队列库。它们允许开发者将耗时的操作(如发送邮件、处理图像或生成报告)从主请求循环中剥离,异步执行以提高应用响应速度。agenda 基于 MongoDB,适合需要持久化和复杂调度的场景;bull 和 bullmq 基于 Redis,性能更高,其中 bullmq 是现代化的重构版本;bee-queue 追求极简的 Redis 实现;而 kue 是早期的 Redis 队列方案,目前已不再维护。
在构建高可用的 Node.js 应用时,将耗时任务移出主线程是标准做法。agenda、bee-queue、bull、bullmq 和 kue 都旨在解决这一问题,但它们的底层架构、存储引擎和维护状态截然不同。本文将从架构、API 设计和维护性三个维度进行深度剖析。
选择队列库的核心在于理解数据存哪里以及如何取出来。
agenda 使用 MongoDB 存储作业。
// agenda: 基于 MongoDB
const agenda = new Agenda({ db: { address: 'mongodb://localhost/agenda' } });
await agenda.start();
bee-queue 使用 Redis,强调极简。
// bee-queue: 基于 Redis
const Queue = require('bee-queue');
const queue = new Queue('my-queue', { redis: { host: 'localhost' } });
bull 使用 Redis 和 Lua 脚本。
// bull: 基于 Redis
const Queue = require('bull');
const queue = new Queue('my-queue', 'redis://localhost');
bullmq 使用 Redis 5+,架构分离。
Queue(提交作业)和 Worker(处理作业)。// bullmq: 基于 Redis 5+
const { Queue, Worker } = require('bullmq');
const queue = new Queue('my-queue', { connection: { host: 'localhost' } });
const worker = new Worker('my-queue', async (job) => {}, { connection: { host: 'localhost' } });
kue 使用 Redis,旧式架构。
// kue: 基于 Redis (已弃用)
const kue = require('kue');
const queue = kue.createQueue();
不同库在创建任务时的 API 风格差异明显,这直接影响代码的可读性。
agenda 需要先定义任务名称,再调度。
// agenda: 定义并调度
agenda.define('send email', async (job) => { /*...*/ });
await agenda.every('5 minutes', 'send email', { to: 'user@example.com' });
bee-queue 直接创建作业对象并保存。
// bee-queue: 创建并保存
const job = queue.createJob({ to: 'user@example.com' });
await job.save();
bull 通过队列实例直接添加数据。
// bull: 直接添加
await queue.add({ to: 'user@example.com' }, { attempts: 3 });
bullmq 与 bull 类似,但配置更现代化。
// bullmq: 直接添加
await queue.add('send email', { to: 'user@example.com' }, { attempts: 3 });
kue 类似 bee-queue,创建后保存。
// kue: 创建并保存 (已弃用)
const job = queue.create('send email', { to: 'user@example.com' });
job.save();
当任务被执行时,各库的处理器写法有所不同,特别是在异步控制和完成信号上。
agenda 使用 async/await 或回调。
done() 决定。// agenda: 处理器
agenda.define('send email', async (job) => {
await sendEmail(job.attrs.data.to);
});
bee-queue 使用回调或 Promise。
done() 或返回 Promise。// bee-queue: 处理器
queue.process(async (job, done) => {
await sendEmail(job.data.to);
done();
});
bull 支持 Promise 或回调。
// bull: 处理器
queue.process(async (job) => {
await sendEmail(job.data.to);
});
bullmq 必须在 Worker 中定义处理器。
// bullmq: Worker 处理器
const worker = new Worker('my-queue', async (job) => {
await sendEmail(job.data.to);
}, { connection: redisConfig });
kue 使用回调风格。
done()。// kue: 处理器 (已弃用)
queue.process('send email', (job, done) => {
sendEmail(job.data.to, done);
});
对于需要周期性执行的任务,各库的支持程度不一。
agenda 原生支持 cron 语法。
agenda 的强项,非常适合定时调度。// agenda: 原生 Cron
await agenda.every('*/5 * * * *', 'send email');
bee-queue 不原生支持重复任务。
// bee-queue: 无原生支持
// 需在任务完成后手动再次 job.save() 实现循环
bull 支持重复任务配置。
repeat 选项实现。// bull: 重复配置
await queue.add({ to: 'user' }, { repeat: { cron: '*/5 * * * *' } });
bullmq 支持更强大的重复配置。
// bullmq: 重复配置
await queue.add('send email', { to: 'user' }, { repeat: { pattern: '*/5 * * * *' } });
kue 支持延迟和简单重复。
// kue: 延迟任务 (已弃用)
const job = queue.create('email', data).delay(2000);
job.save();
这是架构选型中最关键的一环,直接影响项目的长期稳定性。
| 包名 | 存储 | 维护状态 | 建议 |
|---|---|---|---|
agenda | MongoDB | ✅ 活跃 | 适合 Mongo 用户 |
bee-queue | Redis | ⚠️ 低活跃 | 适合简单场景 |
bull | Redis | ⚠️ 维护模式 | 旧项目继续,新项目慎用 |
bullmq | Redis 5+ | ✅ 活跃 | ✅ 新项目首选 |
kue | Redis | ❌ 已弃用 | ❌ 禁止使用 |
kue 已明确标记为不再维护,存在已知漏洞,切勿在生产环境使用。
bull 虽然稳定,但官方推荐迁移至 bullmq 以获取更好的性能和类型支持。
agenda 是 MongoDB 生态下的最佳选择,但需注意数据库写入压力。
bullmq 是目前 Node.js 生态中最均衡的选择 — 性能强大、架构现代、社区活跃。如果你的基础设施支持 Redis 5+,它是默认选项。
agenda 适合那些已经重度依赖 MongoDB 且需要复杂定时调度的团队 — 避免引入额外的 Redis 依赖。
bee-queue 适合极简主义项目 — 如果你只需要一个简单的 FIFO 队列且不想配置复杂的环境。
kue 和 bull (3.x) 属于历史遗产 — 除非维护旧系统,否则不应出现在新的架构设计图中。选择正确的工具能让后台任务管理变得简单 — 而不是成为技术债务的来源。
如果你的项目已经使用 MongoDB,或者需要复杂的 cron 表达式调度以及作业内容的持久化存储,选择 agenda 是最自然的决定。它适合对任务状态查询要求高、且能接受比 Redis 稍慢写入速度的场景。
选择 bee-queue 适合需要极简 API、低内存占用且基于 Redis 的轻量级项目。它的代码库很小,易于理解,但社区活跃度相对较低,适合不需要复杂高级功能的简单队列需求。
如果你的项目是旧系统维护,或者需要大量现有的 bull 3.x 生态插件,可以继续使用 bull。但需注意它已进入维护模式,新功能开发建议转向 bullmq,适合追求稳定且不想大幅重构的现有 Redis 队列用户。
对于全新的 Node.js 项目,bullmq 是首选方案。它采用了现代架构,将队列定义与 worker 处理分离,支持 Redis 5+ 的高级特性,性能更优且类型支持更好,适合高并发和长期维护的生产环境。
切勿在新项目中选择 kue。该库已正式弃用且多年未维护,存在未修复的安全漏洞和兼容性问题。如果当前正在使用,应尽快制定迁移计划,转向 bullmq 或 agenda 等活跃维护的替代方案。
A light-weight job scheduling library for Node.js
Migrating from v5? See the Migration Guide for all breaking changes.
Agenda 6.x is a complete TypeScript rewrite with a focus on modularity and flexibility:
Pluggable storage backends - Choose from MongoDB, PostgreSQL, Redis, or implement your own. Each backend is a separate package - install only what you need.
Pluggable notification channels - Move beyond polling with real-time job notifications via Redis, PostgreSQL LISTEN/NOTIFY, or other pub/sub systems. Jobs get processed immediately when saved, not on the next poll cycle.
Modern stack - ESM-only, Node.js 18+, full TypeScript with strict typing.
See the 6.x Roadmap for details and progress.
Install the core package and your preferred backend:
# For MongoDB
npm install agenda @agendajs/mongo-backend
# For PostgreSQL
npm install agenda @agendajs/postgres-backend
# For Redis
npm install agenda @agendajs/redis-backend
Requirements:
import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
const agenda = new Agenda({
backend: new MongoBackend({ address: 'mongodb://localhost/agenda' })
});
// Define a job
agenda.define('send email', async (job) => {
const { to, subject } = job.attrs.data;
await sendEmail(to, subject);
});
// Start processing
await agenda.start();
// Schedule jobs
await agenda.every('1 hour', 'send email', { to: 'user@example.com', subject: 'Hello' });
await agenda.schedule('in 5 minutes', 'send email', { to: 'admin@example.com', subject: 'Report' });
await agenda.now('send email', { to: 'support@example.com', subject: 'Urgent' });
| Package | Backend | Notifications | Install |
|---|---|---|---|
@agendajs/mongo-backend | MongoDB | Polling only | npm install @agendajs/mongo-backend |
@agendajs/postgres-backend | PostgreSQL | LISTEN/NOTIFY | npm install @agendajs/postgres-backend |
@agendajs/redis-backend | Redis | Pub/Sub | npm install @agendajs/redis-backend |
| Backend | Storage | Notifications | Notes |
|---|---|---|---|
MongoDB (MongoBackend) | ✅ | ❌ | Storage only. Combine with external notification channel for real-time. |
PostgreSQL (PostgresBackend) | ✅ | ✅ | Full backend. Uses LISTEN/NOTIFY for notifications. |
Redis (RedisBackend) | ✅ | ✅ | Full backend. Uses Pub/Sub for notifications. |
| InMemoryNotificationChannel | ❌ | ✅ | Notifications only. For single-process/testing. |
import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
// Via connection string
const agenda = new Agenda({
backend: new MongoBackend({ address: 'mongodb://localhost/agenda' })
});
// Via existing MongoDB connection
const agenda = new Agenda({
backend: new MongoBackend({ mongo: existingDb })
});
// With options
const agenda = new Agenda({
backend: new MongoBackend({
mongo: db,
collection: 'jobs' // Collection name (default: 'agendaJobs')
}),
processEvery: '30 seconds', // Job polling interval
maxConcurrency: 20, // Max concurrent jobs
defaultConcurrency: 5 // Default per job type
});
import { Agenda } from 'agenda';
import { PostgresBackend } from '@agendajs/postgres-backend';
const agenda = new Agenda({
backend: new PostgresBackend({
connectionString: 'postgresql://user:pass@localhost:5432/mydb'
})
});
import { Agenda } from 'agenda';
import { RedisBackend } from '@agendajs/redis-backend';
const agenda = new Agenda({
backend: new RedisBackend({
connectionString: 'redis://localhost:6379'
})
});
For faster job processing across distributed systems:
import { Agenda, InMemoryNotificationChannel } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
const agenda = new Agenda({
backend: new MongoBackend({ mongo: db }),
notificationChannel: new InMemoryNotificationChannel()
});
You can use MongoDB for storage while using a different system for real-time notifications:
import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
import { RedisBackend } from '@agendajs/redis-backend';
// MongoDB for storage + Redis for real-time notifications
const redisBackend = new RedisBackend({ connectionString: 'redis://localhost:6379' });
const agenda = new Agenda({
backend: new MongoBackend({ mongo: db }),
notificationChannel: redisBackend.notificationChannel
});
This is useful when you want MongoDB's proven durability and flexible queries for job storage, but need faster real-time notifications across multiple processes.
// Simple async handler
agenda.define('my-job', async (job) => {
console.log('Processing:', job.attrs.data);
});
// With options
agenda.define('my-job', async (job) => { /* ... */ }, {
concurrency: 10,
lockLimit: 5,
lockLifetime: 10 * 60 * 1000, // 10 minutes
priority: 'high'
});
For a class-based approach, use TypeScript decorators:
import { JobsController, Define, Every, registerJobs, Job } from 'agenda';
@JobsController({ namespace: 'email' })
class EmailJobs {
@Define({ concurrency: 5 })
async sendWelcome(job: Job<{ userId: string }>) {
console.log('Sending welcome to:', job.attrs.data.userId);
}
@Every('1 hour')
async cleanupBounced(job: Job) {
console.log('Cleaning up bounced emails');
}
}
registerJobs(agenda, [new EmailJobs()]);
await agenda.start();
// Schedule using namespaced name
await agenda.now('email.sendWelcome', { userId: '123' });
See Decorators Documentation for full details.
// Run immediately
await agenda.now('my-job', { userId: '123' });
// Run at specific time
await agenda.schedule('tomorrow at noon', 'my-job', data);
await agenda.schedule(new Date('2024-12-25'), 'my-job', data);
// Run repeatedly
await agenda.every('5 minutes', 'my-job');
await agenda.every('0 * * * *', 'my-job'); // Cron syntax
// Cancel jobs matching a filter (removes from database)
await agenda.cancel({ name: 'my-job' });
await agenda.cancel({ name: 'my-job', data: { userId: 123 } });
// Cancel ALL jobs unconditionally
await agenda.cancelAll();
// Disable/enable jobs globally (by query)
await agenda.disable({ name: 'my-job' }); // Disable all jobs matching query
await agenda.enable({ name: 'my-job' }); // Enable all jobs matching query
// Disable/enable individual jobs
const job = await agenda.create('my-job', data);
job.disable();
await job.save();
// Progress tracking
agenda.define('long-job', async (job) => {
for (let i = 0; i <= 100; i += 10) {
await doWork();
await job.touch(i); // Report progress 0-100
}
});
// Stop immediately - unlocks running jobs so other workers can pick them up
await agenda.stop();
// Drain - waits for running jobs to complete before stopping
await agenda.drain();
// Drain with timeout (30 seconds) - for cloud platforms with shutdown deadlines
const result = await agenda.drain(30000);
if (result.timedOut) {
console.log(`${result.running} jobs still running after timeout`);
}
// Drain with AbortSignal - for external control
const controller = new AbortController();
setTimeout(() => controller.abort(), 30000);
await agenda.drain({ signal: controller.signal });
Use drain() for graceful shutdowns where you want in-progress jobs to finish their work.
agenda.on('start', (job) => console.log('Job started:', job.attrs.name));
agenda.on('complete', (job) => console.log('Job completed:', job.attrs.name));
agenda.on('success', (job) => console.log('Job succeeded:', job.attrs.name));
agenda.on('fail', (err, job) => console.log('Job failed:', job.attrs.name, err));
// Job-specific events
agenda.on('start:send email', (job) => { /* ... */ });
agenda.on('fail:send email', (err, job) => { /* ... */ });
Use fail listeners to capture richer error context, such as stack traces,
without storing large payloads in job.attrs.failReason:
agenda.on('fail', async (err, job) => {
await saveJobError({
jobId: job.attrs._id,
jobName: job.attrs.name,
message: err.message,
stack: err.stack
});
});
For databases other than MongoDB, PostgreSQL, or Redis, implement AgendaBackend:
import { AgendaBackend, JobRepository } from 'agenda';
class SQLiteBackend implements AgendaBackend {
readonly repository: JobRepository;
readonly notificationChannel = undefined; // Or implement NotificationChannel
async connect() { /* ... */ }
async disconnect() { /* ... */ }
}
const agenda = new Agenda({
backend: new SQLiteBackend({ path: './jobs.db' })
});
See Custom Backend Driver for details.
Official Backend Packages:
Tools:
MIT