From qa-async-jobs
Tests RabbitMQ producer/consumer interactions - supports AMQP 0.9.1 and AMQP 1.0 protocols across 6 tutorial patterns (Hello World, Work Queues, Publish/Subscribe, Routing, Topics, RPC) plus Publisher Confirms; consumer ack/nack/requeue patterns; durable queues + persistent messages; quorum vs classic queue; tests via Testcontainers RabbitMQ image or LocalStack-equivalent. Use when the user works with RabbitMQ producers/consumers (pika, amqplib, RabbitMQ.Client, spring-amqp) and needs unit/integration tests.
How this skill is triggered — by the user, by Claude, or both
Slash command
/qa-async-jobs:rabbitmq-patternsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
RabbitMQ is the leading OSS AMQP message broker. Per
RabbitMQ is the leading OSS AMQP message broker. Per rabbitmq.com/tutorials, the canonical learning path is six tutorials covering progressively more sophisticated patterns:
| Tutorial | Pattern |
|---|---|
| Hello World! | Single producer → single consumer |
| Work Queues | Producer → multiple competing consumers (round-robin) |
| Publish/Subscribe | Fanout exchange → multiple consumers (broadcast) |
| Routing | Direct exchange with routing keys |
| Topics | Topic exchange with wildcard routing patterns |
| RPC | Request-reply via reply-to + correlation-id |
All six tutorials exist in both AMQP 0.9.1 and AMQP 1.0 variants per rmq-tut; AMQP 0.9.1 also has a 7th tutorial - Publisher Confirms - for delivery guarantees.
Per rmq-tut: "Executable versions of these tutorials are open source."
Three approaches:
| Approach | Pros | Cons |
|---|---|---|
| Mock the AMQP client (pika.BlockingConnection, amqplib Channel) | Fast, no broker dep | Doesn't catch protocol-level behavior (ack races, requeue ordering) |
| Testcontainers RabbitMQ (Docker image per test class) | Full AMQP semantics, isolated | Slower; container startup cost |
| Shared dev RabbitMQ instance | Fast | Test interference; queue cleanup discipline required |
Pick Testcontainers for integration tests; mocking for pure producer-logic unit tests.
from testcontainers.rabbitmq import RabbitMqContainer
import pika
@pytest.fixture(scope="session")
def rabbitmq():
with RabbitMqContainer("rabbitmq:3-management") as rmq:
yield rmq
@pytest.fixture
def channel(rabbitmq):
conn = pika.BlockingConnection(pika.URLParameters(rabbitmq.get_connection_url()))
ch = conn.channel()
yield ch
conn.close()
(Testcontainers cleans up the container automatically when the fixture scope ends.)
Producer side:
def publish_order(channel, order_data):
channel.queue_declare(queue='orders', durable=True)
channel.basic_publish(
exchange='',
routing_key='orders',
body=json.dumps(order_data),
properties=pika.BasicProperties(delivery_mode=2), # persistent
)
Test:
def test_publish_order(channel):
publish_order(channel, {"id": 1})
method, props, body = channel.basic_get(queue='orders', auto_ack=True)
assert method is not None
assert json.loads(body) == {"id": 1}
def consume_order(channel):
method, props, body = channel.basic_get(queue='orders')
if method is None:
return None
try:
process(body)
channel.basic_ack(method.delivery_tag)
return body
except TransientError:
channel.basic_nack(method.delivery_tag, requeue=True) # requeue for retry
raise
except PermanentError:
channel.basic_nack(method.delivery_tag, requeue=False) # to DLX if configured
raise
Test the requeue path:
def test_consumer_requeues_on_transient_error(channel, mocker):
channel.queue_declare(queue='orders', durable=True)
channel.basic_publish(exchange='', routing_key='orders', body='msg-1')
mocker.patch('proj.consumer.process', side_effect=TransientError)
with pytest.raises(TransientError):
consume_order(channel)
# Message should be back in the queue:
method, _, body = channel.basic_get(queue='orders', auto_ack=True)
assert body == b'msg-1'
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='orders-dlq', durable=True)
channel.queue_bind(queue='orders-dlq', exchange='dlx', routing_key='orders')
channel.queue_declare(
queue='orders',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'orders',
},
)
After basic_nack(requeue=False) or message TTL expiry, the message
routes to orders-dlq. Test by consuming from the DLQ.
Per the AMQP 0.9.1 Publisher Confirms tutorial (per rmq-tut):
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='orders',
body='order-1',
mandatory=True,
properties=pika.BasicProperties(delivery_mode=2),
)
# If the broker can't route or persist, raises pika.exceptions.UnroutableError
# or NackError. Otherwise the message was confirmed.
except pika.exceptions.UnroutableError:
raise
For tests asserting publisher confirms succeed, simply enable confirm mode and assert no exception.
Quorum queues (since RabbitMQ 3.8) replace mirrored classic queues
for HA. Test pattern is identical to classic queues at the API
level; the difference is in arguments:
channel.queue_declare(
queue='orders',
durable=True,
arguments={'x-queue-type': 'quorum'},
)
Quorum-specific testing (e.g., partition-tolerance) requires multi-node clusters - Jepsen-style; out of scope for typical integration tests.
services:
rabbitmq:
image: rabbitmq:3-management
ports: [5672:5672, 15672:15672]
options: >-
--health-cmd "rabbitmq-diagnostics ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- run: pytest tests/integration/amqp/ -v
The :management tag includes the management UI on port 15672 for
debugging in CI logs.
| Anti-pattern | Why it fails | Fix |
|---|---|---|
Use auto_ack=True for tests of error paths | Message ack-ed before processing → no requeue test possible | auto_ack=False (Step 4) |
Test publisher without confirm_delivery() | Can't assert delivery happened - broker may have dropped the message | Enable confirm mode (Step 6) |
| Skip queue cleanup between tests | Stale messages cause flaky assertions | queue_purge or per-test queue names |
Use durable=False on production-mirrored queues in tests | Tests pass; production loses messages on broker restart | Match production queue config in tests |
sidekiq-tests,
celery-tests,
bullmq-tests,
sqs-patterns - sister tools (Celery
often runs on RabbitMQ; the celery-tests skill complements this
for Python-Celery deployments)idempotency-test-author -
critical companion: requeue + redelivery semantics need
idempotent consumersnpx claudepluginhub testland/qa --plugin qa-async-jobsProvides CDSS development patterns for drug interaction checking, dose validation, clinical scoring (NEWS2, qSOFA), and alert classification integrated into EMR workflows.