agenda vs async vs bee-queue vs bull vs kue vs p-queue vs queue
任务队列和异步处理
agendaasyncbee-queuebullkuep-queuequeue类似的npm包:

任务队列和异步处理

任务队列和异步处理库在Node.js应用程序中扮演着关键角色,帮助开发者管理和执行异步任务,优化资源使用,提升应用性能。这些库提供了不同的功能,如任务调度、并发控制、重试机制和持久化,适用于各种场景,从简单的异步操作到复杂的分布式任务处理。选择合适的库取决于具体需求,如任务的复杂性、需要的功能以及对性能和可扩展性的要求。

npm下载趋势

3 年

GitHub Stars 排名

统计详情

npm包名称
下载量
Stars
大小
Issues
发布时间
License
agenda09,663297 kB56 天前MIT
async028,169808 kB242 年前MIT
bee-queue04,027107 kB485 个月前MIT
bull016,243309 kB1491 年前MIT
kue09,447-2889 年前MIT
p-queue04,20282.5 kB619 天前MIT
queue077219.6 kB193 年前MIT

功能对比: agenda vs async vs bee-queue vs bull vs kue vs p-queue vs queue

任务调度

  • agenda:

    agenda支持复杂的任务调度,包括重复任务和基于时间的调度。它允许你定义任务并设置调度规则,任务将在指定时间自动执行。

  • async:

    async不提供任务调度功能,但允许你控制异步任务的执行顺序和并发。它适合管理异步操作,但不支持基于时间的调度。

  • bee-queue:

    bee-queue专注于快速处理任务,但不提供内置的调度功能。任务一旦加入队列,将尽快被处理。

  • bull:

    bull支持延迟任务和重复任务调度,允许你设置任务的执行时间和频率。它提供丰富的调度功能,适合需要时间控制的任务。

  • kue:

    kue支持延迟任务和任务优先级,但调度功能较为基础。它允许你设置任务的延迟时间,但不支持复杂的调度规则。

  • p-queue:

    p-queue不提供任务调度功能,但允许你控制任务的并发执行。它适合管理任务的执行顺序,但不支持基于时间的调度。

  • queue:

    queue提供基本的任务排队功能,但不支持调度。任务将按照加入顺序依次执行。

并发控制

  • agenda:

    agenda允许你设置并发限制,控制同时执行的任务数量。它适合需要限制资源使用的应用。

  • async:

    async提供多种并发控制方法,如parallelseriesqueue,允许你灵活管理任务的并发执行。

  • bee-queue:

    bee-queue支持并发处理任务,允许你设置工作者数量以控制并发度。它适合需要高并发处理的应用。

  • bull:

    bull支持并发处理任务,允许你配置工作者数量和任务优先级。它提供灵活的并发控制,适合高负载应用。

  • kue:

    kue支持并发处理任务,但并发控制较为简单。你可以设置工作者数量,但缺乏高级并发管理功能。

  • p-queue:

    p-queue允许你设置最大并发数,控制同时执行的任务数量。它适合需要精细控制并发的应用。

  • queue:

    queue支持设置并发数,允许你控制同时执行的任务数量。它提供简单的并发管理功能。

任务持久化

  • agenda:

    agenda基于MongoDB实现任务持久化,确保任务在应用重启后不会丢失。它适合需要持久化和可靠性的任务调度。

  • async:

    async不提供任务持久化功能,所有任务在内存中管理。它适合短期的异步操作,但不适合需要持久化的任务。

  • bee-queue:

    bee-queue基于Redis实现任务持久化,确保任务在系统崩溃后不会丢失。它适合需要高性能和持久化的任务队列。

  • bull:

    bull基于Redis实现任务持久化,支持任务重试和失败处理。它提供可靠的持久化机制,适合生产环境使用。

  • kue:

    kue基于Redis实现任务持久化,支持任务状态跟踪和失败重试。它提供持久化和监控功能,适合需要可靠性的应用。

  • p-queue:

    p-queue不提供任务持久化功能,任务在内存中管理。它适合短期的并发任务管理,但不适合需要持久化的场景。

  • queue:

    queue不提供任务持久化功能,任务在内存中排队。它适合简单的任务处理,但不适合需要持久化的应用。

代码示例

  • agenda:

    任务调度示例

    const { Agenda } = require('agenda');
    const agenda = new Agenda({ db: { address: 'mongodb://localhost/agenda' } });
    
    agenda.define('send email', async (job) => {
      const { to, subject, body } = job.attrs.data;
      // 发送邮件逻辑
    });
    
    (async function() {
      await agenda.start();
      await agenda.schedule('in 2 minutes', 'send email', { to: 'user@example.com', subject: 'Hello', body: 'This is a test email.' });
    })();
    
  • async:

    异步并发控制示例

    const async = require('async');
    
    const tasks = [
      (cb) => setTimeout(() => cb(null, 'Task 1'), 1000),
      (cb) => setTimeout(() => cb(null, 'Task 2'), 500),
      (cb) => setTimeout(() => cb(null, 'Task 3'), 2000),
    ];
    
    async.parallel(tasks, (err, results) => {
      if (err) throw err;
      console.log('All tasks completed:', results);
    });
    
  • bee-queue:

    任务队列示例

    const Queue = require('bee-queue');
    const emailQueue = new Queue('email');
    
    emailQueue.process(async (job) => {
      const { to, subject, body } = job.data;
      // 发送邮件逻辑
      return `Email sent to ${to}`;
    });
    
    const job = emailQueue.createJob({ to: 'user@example.com', subject: 'Hello', body: 'This is a test email.' });
    job.save();
    
  • bull:

    高级任务队列示例

    const Queue = require('bull');
    const emailQueue = new Queue('email');
    
    emailQueue.process(async (job) => {
      const { to, subject, body } = job.data;
      // 发送邮件逻辑
      return `Email sent to ${to}`;
    });
    
    emailQueue.add({ to: 'user@example.com', subject: 'Hello', body: 'This is a test email.' }, { delay: 60000, attempts: 3 });
    
  • kue:

    任务队列示例

    const kue = require('kue');
    const queue = kue.createQueue();
    
    queue.process('email', (job, done) => {
      const { to, subject, body } = job.data;
      // 发送邮件逻辑
      done();
    });
    
    const job = queue.create('email', { to: 'user@example.com', subject: 'Hello', body: 'This is a test email.' }).save();
    
  • p-queue:

    并发控制示例

    const PQueue = require('p-queue');
    const queue = new PQueue({ concurrency: 2 });
    
    const tasks = [
      () => new Promise(resolve => setTimeout(() => resolve('Task 1'), 1000)),
      () => new Promise(resolve => setTimeout(() => resolve('Task 2'), 500)),
      () => new Promise(resolve => setTimeout(() => resolve('Task 3'), 2000)),
    ];
    
    tasks.forEach(task => queue.add(task));
    queue.onIdle().then(() => console.log('All tasks completed'));
    
  • queue:

    简单队列示例

    const Queue = require('queue');
    const queue = new Queue({ concurrency: 2 });
    
    queue.push(async () => {
      // 任务1逻辑
      console.log('Task 1 completed');
    });
    
    queue.push(async () => {
      // 任务2逻辑
      console.log('Task 2 completed');
    });
    
    queue.start();
    

如何选择: agenda vs async vs bee-queue vs bull vs kue vs p-queue vs queue

  • agenda:

    选择agenda如果你需要一个基于MongoDB的任务调度库,支持复杂的调度和重复任务。它适合需要持久化任务和灵活调度的应用。

  • async:

    选择async如果你需要一个全面的异步控制流库,提供多种并发控制和任务管理功能。它适合处理复杂的异步操作,但不提供任务持久化。

  • bee-queue:

    选择bee-queue如果你需要一个轻量级、高性能的队列库,专注于快速处理任务,适合对延迟敏感的应用。它基于Redis,适合需要简单且高效的任务队列。

  • bull:

    选择bull如果你需要一个功能丰富的Redis队列库,支持任务优先级、重复任务、延迟任务和事件监听。它适合需要高级功能和高可扩展性的应用。

  • kue:

    选择kue如果你需要一个基于Redis的任务队列,提供任务优先级、延迟任务和失败重试功能。它还提供一个Web界面用于监控任务状态。

  • p-queue:

    选择p-queue如果你需要一个轻量级的Promise队列库,支持并发控制和任务优先级。它适合需要简单并发管理的应用,但不提供持久化。

  • queue:

    选择queue如果你需要一个简单的队列实现,支持基本的任务排队和处理。它适合需要轻量级队列功能的应用,但功能较为基础。

agenda的README

Agenda

Agenda

A light-weight job scheduling library for Node.js

NPM Version NPM Downloads

Migrating from v5? See the Migration Guide for all breaking changes.

Agenda 6.x

Agenda 6.x is a complete TypeScript rewrite with a focus on modularity and flexibility:

  • Pluggable storage backends - Choose from MongoDB, PostgreSQL, Redis, or implement your own. Each backend is a separate package - install only what you need.

  • Pluggable notification channels - Move beyond polling with real-time job notifications via Redis, PostgreSQL LISTEN/NOTIFY, or other pub/sub systems. Jobs get processed immediately when saved, not on the next poll cycle.

  • Modern stack - ESM-only, Node.js 18+, full TypeScript with strict typing.

See the 6.x Roadmap for details and progress.

Features

  • Minimal overhead job scheduling
  • Pluggable storage backends (MongoDB, PostgreSQL, Redis)
  • TypeScript support with full typing
  • Scheduling via cron or human-readable syntax
  • Configurable concurrency and locking
  • Real-time job notifications (optional)
  • Sandboxed worker execution via fork mode
  • TypeScript decorators for class-based job definitions

Installation

Install the core package and your preferred backend:

# For MongoDB
npm install agenda @agendajs/mongo-backend

# For PostgreSQL
npm install agenda @agendajs/postgres-backend

# For Redis
npm install agenda @agendajs/redis-backend

Requirements:

  • Node.js 18+
  • Database of your choice (MongoDB 4+, PostgreSQL, or Redis)

Quick Start

import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';

const agenda = new Agenda({
  backend: new MongoBackend({ address: 'mongodb://localhost/agenda' })
});

// Define a job
agenda.define('send email', async (job) => {
  const { to, subject } = job.attrs.data;
  await sendEmail(to, subject);
});

// Start processing
await agenda.start();

// Schedule jobs
await agenda.every('1 hour', 'send email', { to: 'user@example.com', subject: 'Hello' });
await agenda.schedule('in 5 minutes', 'send email', { to: 'admin@example.com', subject: 'Report' });
await agenda.now('send email', { to: 'support@example.com', subject: 'Urgent' });

Official Backend Packages

PackageBackendNotificationsInstall
@agendajs/mongo-backendMongoDBPolling onlynpm install @agendajs/mongo-backend
@agendajs/postgres-backendPostgreSQLLISTEN/NOTIFYnpm install @agendajs/postgres-backend
@agendajs/redis-backendRedisPub/Subnpm install @agendajs/redis-backend

Backend Capabilities

BackendStorageNotificationsNotes
MongoDB (MongoBackend)Storage only. Combine with external notification channel for real-time.
PostgreSQL (PostgresBackend)Full backend. Uses LISTEN/NOTIFY for notifications.
Redis (RedisBackend)Full backend. Uses Pub/Sub for notifications.
InMemoryNotificationChannelNotifications only. For single-process/testing.

Backend Configuration

MongoDB

import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';

// Via connection string
const agenda = new Agenda({
  backend: new MongoBackend({ address: 'mongodb://localhost/agenda' })
});

// Via existing MongoDB connection
const agenda = new Agenda({
  backend: new MongoBackend({ mongo: existingDb })
});

// With options
const agenda = new Agenda({
  backend: new MongoBackend({
    mongo: db,
    collection: 'jobs'        // Collection name (default: 'agendaJobs')
  }),
  processEvery: '30 seconds', // Job polling interval
  maxConcurrency: 20,         // Max concurrent jobs
  defaultConcurrency: 5       // Default per job type
});

PostgreSQL

import { Agenda } from 'agenda';
import { PostgresBackend } from '@agendajs/postgres-backend';

const agenda = new Agenda({
  backend: new PostgresBackend({
    connectionString: 'postgresql://user:pass@localhost:5432/mydb'
  })
});

Redis

import { Agenda } from 'agenda';
import { RedisBackend } from '@agendajs/redis-backend';

const agenda = new Agenda({
  backend: new RedisBackend({
    connectionString: 'redis://localhost:6379'
  })
});

Real-Time Notifications

For faster job processing across distributed systems:

import { Agenda, InMemoryNotificationChannel } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';

const agenda = new Agenda({
  backend: new MongoBackend({ mongo: db }),
  notificationChannel: new InMemoryNotificationChannel()
});

Mixing Storage and Notification Backends

You can use MongoDB for storage while using a different system for real-time notifications:

import { Agenda } from 'agenda';
import { MongoBackend } from '@agendajs/mongo-backend';
import { RedisBackend } from '@agendajs/redis-backend';

// MongoDB for storage + Redis for real-time notifications
const redisBackend = new RedisBackend({ connectionString: 'redis://localhost:6379' });
const agenda = new Agenda({
  backend: new MongoBackend({ mongo: db }),
  notificationChannel: redisBackend.notificationChannel
});

This is useful when you want MongoDB's proven durability and flexible queries for job storage, but need faster real-time notifications across multiple processes.

API Overview

Defining Jobs

// Simple async handler
agenda.define('my-job', async (job) => {
  console.log('Processing:', job.attrs.data);
});

// With options
agenda.define('my-job', async (job) => { /* ... */ }, {
  concurrency: 10,
  lockLimit: 5,
  lockLifetime: 10 * 60 * 1000, // 10 minutes
  priority: 'high'
});

Defining Jobs with Decorators

For a class-based approach, use TypeScript decorators:

import { JobsController, Define, Every, registerJobs, Job } from 'agenda';

@JobsController({ namespace: 'email' })
class EmailJobs {
  @Define({ concurrency: 5 })
  async sendWelcome(job: Job<{ userId: string }>) {
    console.log('Sending welcome to:', job.attrs.data.userId);
  }

  @Every('1 hour')
  async cleanupBounced(job: Job) {
    console.log('Cleaning up bounced emails');
  }
}

registerJobs(agenda, [new EmailJobs()]);
await agenda.start();

// Schedule using namespaced name
await agenda.now('email.sendWelcome', { userId: '123' });

See Decorators Documentation for full details.

Scheduling Jobs

// Run immediately
await agenda.now('my-job', { userId: '123' });

// Run at specific time
await agenda.schedule('tomorrow at noon', 'my-job', data);
await agenda.schedule(new Date('2024-12-25'), 'my-job', data);

// Run repeatedly
await agenda.every('5 minutes', 'my-job');
await agenda.every('0 * * * *', 'my-job'); // Cron syntax

Job Control

// Cancel jobs matching a filter (removes from database)
await agenda.cancel({ name: 'my-job' });
await agenda.cancel({ name: 'my-job', data: { userId: 123 } });

// Cancel ALL jobs unconditionally
await agenda.cancelAll();

// Disable/enable jobs globally (by query)
await agenda.disable({ name: 'my-job' });  // Disable all jobs matching query
await agenda.enable({ name: 'my-job' });   // Enable all jobs matching query

// Disable/enable individual jobs
const job = await agenda.create('my-job', data);
job.disable();
await job.save();

// Progress tracking
agenda.define('long-job', async (job) => {
  for (let i = 0; i <= 100; i += 10) {
    await doWork();
    await job.touch(i); // Report progress 0-100
  }
});

Stopping / Draining

// Stop immediately - unlocks running jobs so other workers can pick them up
await agenda.stop();

// Drain - waits for running jobs to complete before stopping
await agenda.drain();

// Drain with timeout (30 seconds) - for cloud platforms with shutdown deadlines
const result = await agenda.drain(30000);
if (result.timedOut) {
    console.log(`${result.running} jobs still running after timeout`);
}

// Drain with AbortSignal - for external control
const controller = new AbortController();
setTimeout(() => controller.abort(), 30000);
await agenda.drain({ signal: controller.signal });

Use drain() for graceful shutdowns where you want in-progress jobs to finish their work.

Events

agenda.on('start', (job) => console.log('Job started:', job.attrs.name));
agenda.on('complete', (job) => console.log('Job completed:', job.attrs.name));
agenda.on('success', (job) => console.log('Job succeeded:', job.attrs.name));
agenda.on('fail', (err, job) => console.log('Job failed:', job.attrs.name, err));

// Job-specific events
agenda.on('start:send email', (job) => { /* ... */ });
agenda.on('fail:send email', (err, job) => { /* ... */ });

Use fail listeners to capture richer error context, such as stack traces, without storing large payloads in job.attrs.failReason:

agenda.on('fail', async (err, job) => {
	await saveJobError({
		jobId: job.attrs._id,
		jobName: job.attrs.name,
		message: err.message,
		stack: err.stack
	});
});

Custom Backend

For databases other than MongoDB, PostgreSQL, or Redis, implement AgendaBackend:

import { AgendaBackend, JobRepository } from 'agenda';

class SQLiteBackend implements AgendaBackend {
  readonly repository: JobRepository;
  readonly notificationChannel = undefined; // Or implement NotificationChannel

  async connect() { /* ... */ }
  async disconnect() { /* ... */ }
}

const agenda = new Agenda({
  backend: new SQLiteBackend({ path: './jobs.db' })
});

See Custom Backend Driver for details.

Documentation

Related Packages

Official Backend Packages:

Tools:

License

MIT