这四个库都是用于在 Node.js 环境中处理后台任务和定时作业的解决方案。agenda 基于 MongoDB,bull 基于 Redis,kue 也是基于 Redis 但已停止维护,pg-boss 则直接利用 PostgreSQL 数据库作为队列存储。它们帮助开发者将耗时操作(如发送邮件、处理图片)从主请求线程中剥离,提高应用响应速度。
在构建现代 Web 应用时,将耗时任务(如发送邮件、生成报表)放入后台队列是标准做法。agenda、bull、kue 和 pg-boss 都是 Node.js 生态中流行的作业调度库,但它们的底层架构和适用场景截然不同。本文将从架构、代码实现和维护状态三个维度进行深度解析。
这四个库最大的区别在于它们依赖的存储后端,这直接影响了你的基础设施成本和性能表现。
agenda 依赖 MongoDB。
bull 依赖 Redis。
kue 依赖 Redis。
bull 类似,但设计更早期。pg-boss 依赖 PostgreSQL。
LISTEN/NOTIFY 机制和表存储。不同的库在处理任务的定义和执行上有不同的代码风格。现代开发更倾向于 async/await,但老牌库可能保留回调风格。
agenda 使用基于回调或 Promise 的定义方式,支持灵活查询。
// agenda: 定义任务
agenda.define('send email', async (job) => {
const { to, subject } = job.attrs.data;
await emailService.send(to, subject);
});
// 启动调度器
agenda.start();
bull 采用队列实例化模式,处理逻辑与队列绑定。
// bull: 定义处理逻辑
const Queue = require('bull');
const emailQueue = new Queue('email');
emailQueue.process(async (job) => {
const { to, subject } = job.data;
await emailService.send(to, subject);
});
kue 使用全局单例队列,API 较为古老,大量使用回调。
// kue: 定义处理逻辑
const kue = require('kue');
const queue = kue.createQueue();
queue.process('email', (job, done) => {
emailService.send(job.data.to, job.data.subject)
.then(() => done())
.catch(done);
});
pg-boss 强调基于名称的工作注册,完全支持 Promise。
// pg-boss: 定义工作
const PgBoss = require('pg-boss');
const boss = new PgBoss('postgres://user:pass@host/db');
await boss.start();
await boss.work('send-email', async (job) => {
const { to, subject } = job.data;
await emailService.send(to, subject);
});
对于定时任务(Cron)或延迟任务,各库的实现方式反映了其底层存储的能力。
agenda 原生支持强大的 cron 语法,非常适合复杂的调度需求。
// agenda: 每天上午 9 点运行
agenda.schedule('0 9 * * *', 'daily report', { type: 'summary' });
bull 通过配置项支持重复任务,依赖 Redis 的计时器。
// bull: 每 1000ms 运行一次
emailQueue.add({ to: 'admin' }, {
repeat: { every: 1000 }
});
kue 支持延迟执行,但复杂的 cron 调度需配合外部库。
// kue: 延迟 5 秒执行
kue.Job.create('email', data)
.delay(5000)
.save();
pg-boss 支持基于时间的调度,利用数据库时钟。
// pg-boss: 延迟发送
await boss.send('send-email', data, {
singletonKey: 'unique-id',
singletonSeconds: 10
});
这是架构选型中最关键的一环。使用已废弃的库会带来严重的安全和技术债务。
kue:已废弃(Deprecated)。其 GitHub 仓库已归档,最后更新停留在数年前。存在未修复的安全漏洞,严禁在新项目中使用。agenda:维护中。社区活跃,定期发布补丁,适合 Mongo 栈。bull:维护中。非常稳定,但注意 bull (v3/v4) 与 bullmq (新一代) 的区别,本文讨论的是经典版 bull。pg-boss:维护中。随着 Postgres 在 Node 社区的流行,其关注度正在上升。尽管实现不同,这四个库都提供了以下核心能力,帮助你构建稳健的后台系统:
kue 除外,因其已废弃且数据可能不一致)。// 所有库都支持类似的数据结构
// { id: 1, name: 'email', data: {...}, status: 'active' }
// bull 示例
queue.add(data, { attempts: 3 });
// agenda 示例
job.fail('error message'); // 可触发重试逻辑
// bull: 限制并发数为 5
queue.process(5, async (job) => { ... });
// pg-boss: 在 work 选项中配置
boss.work('task', { batchSize: 5 }, handler);
| 特性 | agenda | bull | kue | pg-boss |
|---|---|---|---|---|
| 存储后端 | MongoDB | Redis | Redis | PostgreSQL |
| 维护状态 | ✅ 活跃 | ✅ 活跃 | ❌ 已废弃 | ✅ 活跃 |
| 基础设施 | 需 Mongo | 需 Redis | 需 Redis | 仅需 Postgres |
| 调度能力 | ⭐⭐⭐⭐ (Cron) | ⭐⭐⭐ (Repeat) | ⭐⭐ (Delay) | ⭐⭐⭐ (Interval) |
| 性能 | 中 | 高 | 中 | 中 |
| 推荐指数 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ❌ | ⭐⭐⭐⭐ |
选择哪个库,本质上是在选择你的基础设施依赖。
bull 是不二之选。它是经过时间考验的工业级标准。agenda 能减少架构复杂度。pg-boss 提供了极佳的“开箱即用”体验。kue。技术选型不仅要考虑功能,还要考虑长期的安全性和可维护性。最后建议:对于全新的 Node.js 项目,如果不确定未来规模,pg-boss 是启动成本最低的选择;如果预见到高并发流量,直接上 bull 更为稳妥。
如果你已经在项目中使用了 MongoDB,并且需要灵活的 cron 表达式来调度任务,agenda 是一个自然的选择。它的 API 设计直观,支持任务持久化,适合需要精确控制执行时间的场景。但要注意,高并发下 MongoDB 可能成为瓶颈,需评估数据库负载。
如果你追求高性能和高并发处理能力,且架构中已经部署了 Redis,bull 是业界的标准选择。它支持优先级、延迟任务和重试机制,稳定性经过大量生产环境验证。对于需要快速处理大量短时任务的项目,它是首选方案。
不要在新项目中选择 kue。该库已正式废弃(deprecated),不再接收安全更新或功能修复。虽然它的 API 很简单,但继续使用会带来维护风险。请迁移到 bull 或其他现代替代方案。
如果你的技术栈以 PostgreSQL 为主,且希望减少基础设施依赖(不需要额外部署 Redis 或 Mongo),pg-boss 是最佳选择。它利用数据库的监听/通知机制实现队列,运维成本低。适合中小型项目或希望架构简单化的团队。
A light-weight job scheduling library for Node.js
Migrating from v5? See the Migration Guide for all breaking changes.
Agenda 6.x is a complete TypeScript rewrite with a focus on modularity and flexibility:
Pluggable storage backends - Choose from MongoDB, PostgreSQL, Redis, or implement your own. Each backend is a separate package - install only what you need.
Pluggable notification channels - Move beyond polling with real-time job notifications via Redis, PostgreSQL LISTEN/NOTIFY, or other pub/sub systems. Jobs get processed immediately when saved, not on the next poll cycle.
Modern stack - ESM-only, Node.js 18+, full TypeScript with strict typing.
See the 6.x Roadmap for details and progress.
Install the core package and your preferred backend:
# For MongoDB
npm install agenda @agendajs/mongo-backend
# For PostgreSQL
npm install agenda @agendajs/postgres-backend
# For Redis
npm install agenda @agendajs/redis-backend
Requirements:
import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
const agenda = new Agenda({
backend: new MongoBackend({ address: 'mongodb://localhost/agenda' })
});
// Define a job
agenda.define('send email', async (job) => {
const { to, subject } = job.attrs.data;
await sendEmail(to, subject);
});
// Start processing
await agenda.start();
// Schedule jobs
await agenda.every('1 hour', 'send email', { to: 'user@example.com', subject: 'Hello' });
await agenda.schedule('in 5 minutes', 'send email', { to: 'admin@example.com', subject: 'Report' });
await agenda.now('send email', { to: 'support@example.com', subject: 'Urgent' });
| Package | Backend | Notifications | Install |
|---|---|---|---|
@agendajs/mongo-backend | MongoDB | Polling only | npm install @agendajs/mongo-backend |
@agendajs/postgres-backend | PostgreSQL | LISTEN/NOTIFY | npm install @agendajs/postgres-backend |
@agendajs/redis-backend | Redis | Pub/Sub | npm install @agendajs/redis-backend |
| Backend | Storage | Notifications | Notes |
|---|---|---|---|
MongoDB (MongoBackend) | ✅ | ❌ | Storage only. Combine with external notification channel for real-time. |
PostgreSQL (PostgresBackend) | ✅ | ✅ | Full backend. Uses LISTEN/NOTIFY for notifications. |
Redis (RedisBackend) | ✅ | ✅ | Full backend. Uses Pub/Sub for notifications. |
| InMemoryNotificationChannel | ❌ | ✅ | Notifications only. For single-process/testing. |
import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
// Via connection string
const agenda = new Agenda({
backend: new MongoBackend({ address: 'mongodb://localhost/agenda' })
});
// Via existing MongoDB connection
const agenda = new Agenda({
backend: new MongoBackend({ mongo: existingDb })
});
// With options
const agenda = new Agenda({
backend: new MongoBackend({
mongo: db,
collection: 'jobs' // Collection name (default: 'agendaJobs')
}),
processEvery: '30 seconds', // Job polling interval
maxConcurrency: 20, // Max concurrent jobs
defaultConcurrency: 5 // Default per job type
});
import { Agenda } from 'agenda';
import { PostgresBackend } from '@agendajs/postgres-backend';
const agenda = new Agenda({
backend: new PostgresBackend({
connectionString: 'postgresql://user:pass@localhost:5432/mydb'
})
});
import { Agenda } from 'agenda';
import { RedisBackend } from '@agendajs/redis-backend';
const agenda = new Agenda({
backend: new RedisBackend({
connectionString: 'redis://localhost:6379'
})
});
For faster job processing across distributed systems:
import { Agenda, InMemoryNotificationChannel } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
const agenda = new Agenda({
backend: new MongoBackend({ mongo: db }),
notificationChannel: new InMemoryNotificationChannel()
});
You can use MongoDB for storage while using a different system for real-time notifications:
import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
import { RedisBackend } from '@agendajs/redis-backend';
// MongoDB for storage + Redis for real-time notifications
const redisBackend = new RedisBackend({ connectionString: 'redis://localhost:6379' });
const agenda = new Agenda({
backend: new MongoBackend({ mongo: db }),
notificationChannel: redisBackend.notificationChannel
});
This is useful when you want MongoDB's proven durability and flexible queries for job storage, but need faster real-time notifications across multiple processes.
// Simple async handler
agenda.define('my-job', async (job) => {
console.log('Processing:', job.attrs.data);
});
// With options
agenda.define('my-job', async (job) => { /* ... */ }, {
concurrency: 10,
lockLimit: 5,
lockLifetime: 10 * 60 * 1000, // 10 minutes
priority: 'high'
});
For a class-based approach, use TypeScript decorators:
import { JobsController, Define, Every, registerJobs, Job } from 'agenda';
@JobsController({ namespace: 'email' })
class EmailJobs {
@Define({ concurrency: 5 })
async sendWelcome(job: Job<{ userId: string }>) {
console.log('Sending welcome to:', job.attrs.data.userId);
}
@Every('1 hour')
async cleanupBounced(job: Job) {
console.log('Cleaning up bounced emails');
}
}
registerJobs(agenda, [new EmailJobs()]);
await agenda.start();
// Schedule using namespaced name
await agenda.now('email.sendWelcome', { userId: '123' });
See Decorators Documentation for full details.
// Run immediately
await agenda.now('my-job', { userId: '123' });
// Run at specific time
await agenda.schedule('tomorrow at noon', 'my-job', data);
await agenda.schedule(new Date('2024-12-25'), 'my-job', data);
// Run repeatedly
await agenda.every('5 minutes', 'my-job');
await agenda.every('0 * * * *', 'my-job'); // Cron syntax
// Cancel jobs (removes from database)
await agenda.cancel({ name: 'my-job' });
// Disable/enable jobs globally (by query)
await agenda.disable({ name: 'my-job' }); // Disable all jobs matching query
await agenda.enable({ name: 'my-job' }); // Enable all jobs matching query
// Disable/enable individual jobs
const job = await agenda.create('my-job', data);
job.disable();
await job.save();
// Progress tracking
agenda.define('long-job', async (job) => {
for (let i = 0; i <= 100; i += 10) {
await doWork();
await job.touch(i); // Report progress 0-100
}
});
// Stop immediately - unlocks running jobs so other workers can pick them up
await agenda.stop();
// Drain - waits for running jobs to complete before stopping
await agenda.drain();
// Drain with timeout (30 seconds) - for cloud platforms with shutdown deadlines
const result = await agenda.drain(30000);
if (result.timedOut) {
console.log(`${result.running} jobs still running after timeout`);
}
// Drain with AbortSignal - for external control
const controller = new AbortController();
setTimeout(() => controller.abort(), 30000);
await agenda.drain({ signal: controller.signal });
Use drain() for graceful shutdowns where you want in-progress jobs to finish their work.
agenda.on('start', (job) => console.log('Job started:', job.attrs.name));
agenda.on('complete', (job) => console.log('Job completed:', job.attrs.name));
agenda.on('success', (job) => console.log('Job succeeded:', job.attrs.name));
agenda.on('fail', (err, job) => console.log('Job failed:', job.attrs.name, err));
// Job-specific events
agenda.on('start:send email', (job) => { /* ... */ });
agenda.on('fail:send email', (err, job) => { /* ... */ });
For databases other than MongoDB, PostgreSQL, or Redis, implement AgendaBackend:
import { AgendaBackend, JobRepository } from 'agenda';
class SQLiteBackend implements AgendaBackend {
readonly repository: JobRepository;
readonly notificationChannel = undefined; // Or implement NotificationChannel
async connect() { /* ... */ }
async disconnect() { /* ... */ }
}
const agenda = new Agenda({
backend: new SQLiteBackend({ path: './jobs.db' })
});
See Custom Backend Driver for details.
Official Backend Packages:
Tools:
MIT