From bee
Use when project uses NestJS with RabbitMQ microservices. Covers transport config, message/event patterns, manual ACK, CQRS, hybrid apps, dead letters, health checks, and testing. Detection: check package.json for @nestjs/microservices + amqplib or amqp-connection-manager.
How this skill is triggered — by the user, by Claude, or both
Slash command
/bee:nestjs-rabbitmqThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
These standards apply when the project uses NestJS with RabbitMQ as a message transport. **Detection:** check `package.json` for `@nestjs/microservices` AND (`amqplib` OR `amqp-connection-manager`). If neither is present, this skill does not apply.
These standards apply when the project uses NestJS with RabbitMQ as a message transport. Detection: check package.json for @nestjs/microservices AND (amqplib OR amqp-connection-manager). If neither is present, this skill does not apply.
Also read the nestjs stack skill for core NestJS conventions (modules, services, controllers, DI, testing). This skill covers RabbitMQ-specific microservice patterns only.
// main.ts — pure microservice (no HTTP)
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL || 'amqp://localhost:5672'],
queue: 'orders_queue',
queueOptions: {
durable: true, // survive broker restart
},
noAck: false, // manual acknowledgment — ALWAYS
prefetchCount: 1, // process one message at a time
persistent: true, // messages survive broker restart
},
});
await app.listen();
}
bootstrap();
When the same app serves both HTTP endpoints AND processes messages:
// main.ts — hybrid app
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
queue: 'orders_queue',
queueOptions: { durable: true },
noAck: false,
prefetchCount: 1,
},
});
await app.startAllMicroservices();
await app.listen(3000);
}
noAck: false ALWAYS. Manual acknowledgment prevents message loss on crashes. Never use noAck: true in production.durable: true on queues. Queues survive broker restarts. Messages in durable queues are recovered.persistent: true on messages. Messages survive broker restarts when in durable queues.prefetchCount: 1 for ordered processing. Increase for throughput when order doesn't matter.@MessagePattern)Synchronous-style RPC where the sender waits for a response:
// Consumer — handles the message and returns a response
@Controller()
export class OrdersController {
private readonly orderService = inject(OrderService);
@MessagePattern('order.create')
async createOrder(@Payload() data: CreateOrderDto, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
try {
const order = await this.orderService.create(data);
channel.ack(originalMsg);
return order; // response sent back to producer
} catch (error) {
channel.nack(originalMsg, false, false); // reject, don't requeue
throw new RpcException(error.message);
}
}
@MessagePattern('order.get')
async getOrder(@Payload() data: { id: string }, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
const order = await this.orderService.findById(data.id);
channel.ack(originalMsg);
return order;
}
}
// Producer — sends message and waits for response
@Injectable()
export class OrderClient {
constructor(@Inject('ORDERS_SERVICE') private readonly client: ClientProxy) {}
createOrder(dto: CreateOrderDto): Observable<Order> {
return this.client.send<Order>('order.create', dto);
}
getOrder(id: string): Observable<Order> {
return this.client.send<Order>('order.get', { id });
}
}
@EventPattern)Fire-and-forget — producer emits, one or more consumers process:
// Consumer — handles the event, no response returned
@Controller()
export class NotificationsController {
@EventPattern('order.created')
async handleOrderCreated(@Payload() data: OrderCreatedEvent, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
try {
await this.notificationService.sendOrderConfirmation(data);
channel.ack(originalMsg);
} catch (error) {
// Nack with requeue for retryable errors
channel.nack(originalMsg, false, true);
}
}
}
// Producer — emits event, does not wait for response
@Injectable()
export class OrderService {
constructor(@Inject('NOTIFICATIONS_SERVICE') private readonly client: ClientProxy) {}
async create(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.save(dto);
this.client.emit('order.created', new OrderCreatedEvent(order));
return order;
}
}
| Pattern | Use When | Returns |
|---|---|---|
@MessagePattern + send() | Need a response (RPC style) — query data, create and return | Response value |
@EventPattern + emit() | Fire-and-forget — notifications, audit, async processing | Nothing |
@Module({
imports: [
ClientsModule.register([
{
name: 'ORDERS_SERVICE',
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
queue: 'orders_queue',
queueOptions: { durable: true },
},
},
{
name: 'NOTIFICATIONS_SERVICE',
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
queue: 'notifications_queue',
queueOptions: { durable: true },
},
},
]),
],
})
export class AppModule {}
ClientsModule.registerAsync([
{
name: 'ORDERS_SERVICE',
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
transport: Transport.RMQ,
options: {
urls: [config.get<string>('RABBITMQ_URL')],
queue: config.get<string>('ORDERS_QUEUE', 'orders_queue'),
queueOptions: { durable: true },
},
}),
},
]),
@MessagePattern('process.order')
async processOrder(@Payload() data: OrderDto, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
try {
await this.processor.process(data);
channel.ack(originalMsg); // success — remove from queue
} catch (error) {
if (error instanceof RetryableError) {
channel.nack(originalMsg, false, true); // requeue for retry
} else {
channel.nack(originalMsg, false, false); // reject — goes to DLQ if configured
}
}
}
ack(msg) — message processed successfully. Remove from queue.nack(msg, false, true) — temporary failure, requeue for retry. Use for transient errors (network timeout, DB connection lost).nack(msg, false, false) — permanent failure, don't requeue. Message goes to dead letter queue (DLQ) if configured. Use for validation errors, business rule violations.// Main queue with DLQ binding
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
queue: 'orders_queue',
queueOptions: {
durable: true,
arguments: {
'x-dead-letter-exchange': '', // default exchange
'x-dead-letter-routing-key': 'orders_dlq', // DLQ queue name
'x-message-ttl': 30000, // optional: TTL before DLQ
},
},
noAck: false,
},
});
// Separate microservice or hybrid app listening on DLQ
@Controller()
export class DlqController {
@EventPattern('orders_dlq')
async handleDeadLetter(@Payload() data: any, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
const headers = originalMsg.properties.headers;
// Log the failure for investigation
this.logger.error('Dead letter received', {
pattern: context.getPattern(),
data,
deathReason: headers?.['x-death']?.[0]?.reason,
deathCount: headers?.['x-death']?.[0]?.count,
originalQueue: headers?.['x-death']?.[0]?.queue,
});
// Store in DB for manual review/retry
await this.deadLetterService.store(data, headers);
channel.ack(originalMsg);
}
}
@MessagePattern('order.process')
async processOrder(@Payload() data: OrderDto, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
const retryCount = (originalMsg.properties.headers?.['x-retry-count'] ?? 0) as number;
const maxRetries = 3;
try {
await this.processor.process(data);
channel.ack(originalMsg);
} catch (error) {
channel.ack(originalMsg); // ack original to prevent immediate requeue
if (retryCount < maxRetries) {
// Re-publish with incremented retry count and delay
const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s
setTimeout(() => {
channel.publish('', 'orders_queue', Buffer.from(JSON.stringify(data)), {
headers: { 'x-retry-count': retryCount + 1 },
persistent: true,
});
}, delay);
} else {
// Max retries exceeded — send to DLQ manually
channel.publish('', 'orders_dlq', Buffer.from(JSON.stringify({
originalData: data,
error: error.message,
retryCount,
timestamp: new Date().toISOString(),
})), { persistent: true });
}
}
}
// Command
export class CreateOrderCommand {
constructor(
public readonly customerId: string,
public readonly items: OrderItem[],
) {}
}
// Handler
@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly repository: OrderRepository,
private readonly eventBus: EventBus,
) {}
async execute(command: CreateOrderCommand): Promise<Order> {
const order = await this.repository.create(command);
this.eventBus.publish(new OrderCreatedEvent(order.id, order.customerId));
return order;
}
}
// Saga — cross-service orchestration
@Injectable()
export class OrderSaga {
@Saga()
orderCreated = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderCreatedEvent),
map(event => new NotifyWarehouseCommand(event.orderId)),
);
};
}
// Service A publishes domain event to RabbitMQ
@Injectable()
export class OrderEventPublisher {
constructor(@Inject('EVENTS_SERVICE') private readonly client: ClientProxy) {}
publishOrderCreated(order: Order) {
this.client.emit('domain.order.created', {
orderId: order.id,
customerId: order.customerId,
total: order.total,
timestamp: new Date().toISOString(),
});
}
}
// Service B consumes domain event from RabbitMQ
@Controller()
export class WarehouseEventsController {
@EventPattern('domain.order.created')
async handleOrderCreated(@Payload() data: OrderCreatedEvent, @Ctx() ctx: RmqContext) {
const channel = ctx.getChannelRef();
const msg = ctx.getMessage();
await this.warehouseService.reserveInventory(data.orderId, data.items);
channel.ack(msg);
}
}
// Producer config with topic exchange
{
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
queue: 'events_queue',
queueOptions: { durable: true },
exchange: 'domain_events',
exchangeType: 'topic',
wildcards: true,
},
}
// Consumer — subscribe to patterns
@EventPattern('order.*') // matches order.created, order.cancelled, etc.
async handleOrderEvents(@Payload() data: any) { ... }
@EventPattern('order.#') // matches order.created, order.item.added, etc.
async handleAllOrderEvents(@Payload() data: any) { ... }
@EventPattern('*.created') // matches order.created, user.created, etc.
async handleCreatedEvents(@Payload() data: any) { ... }
// All consumers get every message — useful for broadcasting
{
exchange: 'broadcast',
exchangeType: 'fanout',
}
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService, MicroserviceHealthIndicator } from '@nestjs/terminus';
import { Transport } from '@nestjs/microservices';
@Controller('health')
export class HealthController {
constructor(
private readonly health: HealthCheckService,
private readonly microservice: MicroserviceHealthIndicator,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.microservice.pingCheck('rabbitmq', {
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
},
}),
]);
}
}
export class CreateOrderDto {
@IsString()
customerId: string;
@IsArray()
@ValidateNested({ each: true })
@Type(() => OrderItemDto)
items: OrderItemDto[];
}
// Validate incoming messages with a pipe
@MessagePattern('order.create')
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async createOrder(@Payload() data: CreateOrderDto) { ... }
// Wrap all events in a standard envelope
interface EventEnvelope<T = any> {
eventType: string;
timestamp: string;
correlationId: string;
source: string;
data: T;
}
// Producer
this.client.emit('order.created', {
eventType: 'order.created',
timestamp: new Date().toISOString(),
correlationId: uuidv4(),
source: 'order-service',
data: { orderId: order.id, customerId: order.customerId },
} satisfies EventEnvelope<OrderCreatedPayload>);
describe('OrdersController', () => {
let controller: OrdersController;
let orderService: jest.Mocked<OrderService>;
beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [OrdersController],
providers: [
{ provide: OrderService, useValue: { create: jest.fn(), findById: jest.fn() } },
],
}).compile();
controller = module.get(OrdersController);
orderService = module.get(OrderService);
});
it('should create order and ack message', async () => {
const dto = { customerId: '1', items: [] };
const mockOrder = { id: '123', ...dto };
orderService.create.mockResolvedValue(mockOrder);
const mockChannel = { ack: jest.fn(), nack: jest.fn() };
const mockMsg = {};
const context = { getChannelRef: () => mockChannel, getMessage: () => mockMsg } as any;
const result = await controller.createOrder(dto, context);
expect(result).toEqual(mockOrder);
expect(mockChannel.ack).toHaveBeenCalledWith(mockMsg);
});
it('should nack message on error', async () => {
orderService.create.mockRejectedValue(new Error('DB error'));
const mockChannel = { ack: jest.fn(), nack: jest.fn() };
const mockMsg = {};
const context = { getChannelRef: () => mockChannel, getMessage: () => mockMsg } as any;
await expect(controller.createOrder({} as any, context)).rejects.toThrow();
expect(mockChannel.nack).toHaveBeenCalledWith(mockMsg, false, false);
});
});
describe('Orders Microservice (e2e)', () => {
let app: INestMicroservice;
let client: ClientProxy;
beforeAll(async () => {
const module = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = module.createNestMicroservice({
transport: Transport.RMQ,
options: { urls: [process.env.RABBITMQ_URL], queue: 'test_orders_queue' },
});
await app.listen();
client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: { urls: [process.env.RABBITMQ_URL], queue: 'test_orders_queue' },
});
await client.connect();
});
afterAll(async () => {
await client.close();
await app.close();
});
it('should create order via message pattern', async () => {
const result = await firstValueFrom(
client.send('order.create', { customerId: '1', items: [{ productId: '1', qty: 2 }] }),
);
expect(result.id).toBeDefined();
});
});
src/
modules/
orders/
orders.module.ts
orders.controller.ts ← @MessagePattern / @EventPattern handlers
orders.service.ts ← business logic
orders.repository.ts ← DB access
dto/
create-order.dto.ts
events/
order-created.event.ts
commands/ ← CQRS (if used)
create-order.command.ts
create-order.handler.ts
sagas/ ← CQRS sagas (if used)
order.saga.ts
notifications/
notifications.module.ts
notifications.controller.ts ← event consumers
common/
interfaces/
event-envelope.interface.ts
pipes/
validation.pipe.ts
health/
health.controller.ts
config/
rabbitmq.config.ts
main.ts
noAck: true in production — messages are lost on crash. Always use manual ACK.nack(msg, false, true) for permanent failures — it creates an infinite retry loop. Use false (don't requeue) and let DLQ handle it.ConfigService with environment variables.durable: true in production — messages lost on broker restart.send() when you don't need a response — use emit() for fire-and-forget events.ValidationPipe on @MessagePattern handlers.@MessagePattern and @EventPattern for the same queue pattern — they have different delivery semantics.@MessagePattern and @EventPattern handler calls channel.ack() or channel.nack().class-validator decorators./health checks RabbitMQ connectivity via @nestjs/terminus.correlationId for distributed tracing.{ eventType, timestamp, correlationId, source, data }.orders_queue, notifications_queue, audit_queue — not one shared queue.ClientsModule.registerAsync() with ConfigService for env-based URLs.order.*, *.created) for flexible event subscription.channel.ack(). Message stays unacked, prefetch slot is consumed, consumer stops receiving after prefetchCount messages.nack(msg, false, true) on permanent failure → message requeued → same handler fails → infinite loop. Use DLQ.send() or emit() before client.connect() resolves — add onApplicationBootstrap() with await this.client.connect().@EventPattern doesn't propagate errors to the producer. Errors must be caught and handled (logged, DLQ'd) inside the handler.@MessagePattern (request-response) when @EventPattern (fire-and-forget) is appropriate.ConfigService.When looking up documentation, use these Context7 library identifiers:
/websites/nestjs — RabbitMQ transport, ClientProxy, message patterns, event patterns/nestjs/docs.nestjs.com — modules, DI, guards, interceptors, pipes/websites/nestjs (search "cqrs") — commands, events, sagas, event sourcingAlways check Context7 for latest NestJS microservice API — transport options and patterns evolve between major versions.
npx claudepluginhub george-popescu/bee-dev --plugin beeCreate NestJS microservices with @MessagePattern and @EventPattern. Configure TCP, Redis, RabbitMQ, Kafka, or NATS transport. Supports hybrid apps and RPC exception handling.
Covers asynchronous messaging patterns in .NET with Wolverine and MassTransit: outbox pattern, saga/choreography, and broker config for RabbitMQ and Azure Service Bus.
Building event-driven systems. Pub/sub, competing consumers, DLQ, sagas, delivery guarantees.