From ts-dev-kit
BullMQ reference for Node.js Redis-backed job queues: create queues/workers, add delayed/prioritized/repeatable/deduplicated jobs, FlowProducer hierarchies, retries/rate limiting/concurrency, cron schedulers, production setup, stalled job debugging.
How this skill is triggered — by the user, by Claude, or both
Slash command
/ts-dev-kit:bullmqThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Redis-backed queue system for Node.js. Four core classes: `Queue`, `Worker`, `QueueEvents`, `FlowProducer`.
Redis-backed queue system for Node.js. Four core classes: Queue, Worker, QueueEvents, FlowProducer.
yarn add bullmq — requires Redis 5.0+ with maxmemory-policy=noeviction.
<quick_reference>
import { Queue, Worker, QueueEvents } from "bullmq";
// --- Producer ---
const queue = new Queue("my-queue", {
connection: { host: "localhost", port: 6379 },
});
await queue.add("job-name", { foo: "bar" });
// --- Consumer ---
const worker = new Worker(
"my-queue",
async (job) => {
// process job
await job.updateProgress(50);
return { result: "done" };
},
{ connection: { host: "localhost", port: 6379 } },
);
worker.on("completed", (job, returnvalue) => {
console.log(`${job.id} completed with`, returnvalue);
});
worker.on("failed", (job, err) => {
console.error(`${job.id} failed with`, err.message);
});
// IMPORTANT: always attach an error handler
worker.on("error", (err) => {
console.error(err);
});
// --- Global event listener (all workers) ---
const queueEvents = new QueueEvents("my-queue", {
connection: { host: "localhost", port: 6379 },
});
queueEvents.on("completed", ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed`);
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
add() → wait / prioritized / delayed
↓
active → completed
↓
failed → (retry) → wait/delayed
With FlowProducer: jobs can also be in waiting-children state until all children complete.
</quick_reference>
BullMQ uses ioredis internally. Pass connection options or an existing ioredis instance.
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
// Option 1: connection config (new connection per instance)
const queue = new Queue("q", {
connection: { host: "redis.example.com", port: 6379 },
});
// Option 2: reuse ioredis instance (Queue and multiple Queues can share)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });
// Option 3: reuse for Workers (BullMQ internally duplicates for blocking)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });
Critical rules:
maxRetriesPerRequest: null on the ioredis instance. BullMQ enforces this and will warn/throw if not set.keyPrefix option — use BullMQ's prefix option instead.QueueEvents cannot share connections (uses blocking Redis commands).maxmemory-policy=noeviction.const queue = new Queue("paint", { connection });
// Add a job
await queue.add("job-name", { color: "red" });
// Add with options
await queue.add(
"job-name",
{ color: "blue" },
{
delay: 5000, // wait 5s before processing
priority: 1, // lower = higher priority (0 is highest, max 2^21)
attempts: 3, // retry up to 3 times
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true, // or { count: 100 } to keep last 100
removeOnFail: 1000, // keep last 1000 failed jobs
},
);
// Add bulk
await queue.addBulk([
{ name: "job1", data: { x: 1 } },
{ name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);
// Queue operations
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // remove all data
await queue.close();
const worker = new Worker<MyData, MyReturn>(
"paint",
async (job) => {
await job.updateProgress(42);
return { cost: 100 };
},
{
connection,
concurrency: 5, // process 5 jobs concurrently
autorun: false, // don't start immediately
},
);
worker.run(); // start when ready
// Update concurrency at runtime
worker.concurrency = 10;
Processor receives 3 args: (job, token?, signal?) — signal is an AbortSignal for cancellation support.
interface JobData {
color: string;
}
interface JobReturn {
cost: number;
}
const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
// job.data is typed as JobData
return { cost: 100 }; // must match JobReturn
});
Worker events (local to that worker instance):
| Event | Callback signature |
|---|---|
completed | (job, returnvalue) |
failed | (job | undefined, error, prev) |
progress | (job, progress: number | object) |
drained | () — queue is empty |
error | (error) — MUST attach this handler |
QueueEvents (global, all workers, uses Redis Streams):
| Event | Callback signature |
|---|---|
completed | ({ jobId, returnvalue }) |
failed | ({ jobId, failedReason }) |
progress | ({ jobId, data }) |
waiting | ({ jobId }) |
active | ({ jobId, prev }) |
delayed | ({ jobId, delay }) |
deduplicated | ({ jobId, deduplicationId, deduplicatedJobId }) |
Event stream is auto-trimmed (~10,000 events). Configure via streams.events.maxLen.
npx claudepluginhub jgamaraalv/ts-dev-kit --plugin ts-dev-kitGuides BullMQ queue implementation with Redis, covering job scheduling, rate limiting, worker patterns, and monitoring for Node.js/TypeScript apps.
Sets up message queues (BullMQ/Redis or RabbitMQ) with competing consumers and dead letter queues for reliable async delivery, background jobs, and decoupled processing.
Guides message queue and job processing setup with Kafka, RabbitMQ, SQS, BullMQ, Celery, Sidekiq. Covers architecture, retries, DLQs, idempotency, priorities, backpressure, and scaling.