Tests STOMP over WebSocket (Spring, ActiveMQ, RabbitMQ Web STOMP) and AMQP 0-9-1 (RabbitMQ Java client) - frame connect/subscribe/send/ack sequences, ack modes (auto/client/client-individual), exchange and queue declarations, binding routing, Testcontainers RabbitMQ broker, and delivery assertion. Use when validating enterprise Spring or RabbitMQ messaging stacks before deploy.
How this skill is triggered — by the user, by Claude, or both
Slash command
/qa-realtime-protocols:stomp-amqp-testsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
This skill covers two complementary enterprise messaging protocols that
This skill covers two complementary enterprise messaging protocols that
travel together on Spring and RabbitMQ stacks: STOMP over WebSocket
(used by Spring @MessageMapping endpoints and browser clients) and
AMQP 0-9-1 (used by RabbitMQ producer/consumer code). Both require
protocol-level test coverage that HTTP and WebSocket-only tools miss.
STOMP frame semantics are defined in the STOMP 1.2 specification. AMQP 0-9-1 exchange/queue/binding and acknowledgement behaviour is documented in RabbitMQ's AMQP concepts guide.
Nearest neighbors and differentiation:
mqtt-tests - covers QoS 0/1/2 for IoT/M2M; does not cover STOMP frames,
exchange routing, or Java AMQP client patterns.websocket-tests - covers raw WebSocket frames; does not know STOMP frame
types, ack modes, or AMQP broker topology.webhook-replay-tests - covers HTTP at-least-once delivery; different
transport and no broker involved.@MessageMapping / @SubscribeMapping endpoints need pre-deploy
frame-level validation.auto, client, client-individual) or prefetch
limits need explicit test coverage.The Testcontainers RabbitMQ module spins up an isolated broker per test class. Per java.testcontainers.org/modules/rabbitmq/, add the dependency:
<!-- Maven -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-rabbitmq</artifactId>
<version>2.0.5</version>
<scope>test</scope>
</dependency>
@Testcontainers
class BrokerTest {
@Container
static RabbitMQContainer rabbit =
new RabbitMQContainer("rabbitmq:3.13-management");
@BeforeAll
static void enablePlugins() {
// Web STOMP is not enabled by default in the base image;
// enable for STOMP-over-WebSocket tests
rabbit.execInContainer("rabbitmq-plugins", "enable",
"rabbitmq_stomp", "rabbitmq_web_stomp");
}
}
The management image exposes: AMQP on 5672, management UI on 15672,
and STOMP on 61613 (after plugin is enabled). The container maps these
to random host ports retrieved via rabbit.getMappedPort(5672) etc.
Per the STOMP 1.2 spec,
a session opens with a CONNECT frame (or STOMP frame; both are valid
in 1.2) that MUST carry accept-version and host headers. The server
replies with a CONNECTED frame carrying version.
// Uses the stompclient Java library or Spring's StompSession
StompSession session = stompClient
.connectAsync("ws://localhost:" + rabbit.getMappedPort(15674) + "/ws",
new StompSessionHandlerAdapter() {})
.get(5, TimeUnit.SECONDS);
assertThat(session.isConnected()).isTrue();
session.disconnect();
For raw frame assertions use a TCP/WebSocket client and read the
CONNECTED frame: a missing version header means the broker rejected
the accept-version negotiation.
Heart-beat is negotiated via heart-beat:<outgoing-ms>,<incoming-ms> in
the CONNECT frame. Per the spec: "if <cx> is 0 (the client cannot
send heart-beats) or <sy> is 0 (the server does not want to receive
heart-beats) then there will be none; otherwise, there will be heart-beats
every MAX(<cx>,<sy>) milliseconds."
Per the STOMP 1.2 spec,
the SUBSCRIBE frame requires id (unique subscription identifier) and
destination, with an optional ack header.
| Ack mode | Spec guarantee | When to use |
|---|---|---|
auto (default) | Broker treats each delivered frame as acknowledged; no client ACK needed | Fire-and-forget; high-throughput sensors |
client | Client sends ACK; each ACK is cumulative - acknowledges all prior messages on the subscription | Batch processing where ordering matters |
client-individual | Each ACK or NACK applies only to the single message identified by the frame's id header - no cumulative effect | Independent per-message processing; DLQ workflows |
BlockingQueue<String> received = new LinkedBlockingQueue<>();
session.subscribe("/queue/orders", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) { return String.class; }
@Override
public void handleFrame(StompHeaders headers, Object payload) {
received.add((String) payload);
// ACK required when ack=client or ack=client-individual
session.acknowledge(headers.getMessageId(), true);
}
});
session.send("/queue/orders", "order-42");
String msg = received.poll(3, TimeUnit.SECONDS);
assertThat(msg).isEqualTo("order-42");
To assert client-individual behavior, subscribe with
ack: client-individual, send two messages, ACK the second before the
first, and assert the first is redelivered - confirming non-cumulative
semantics per the spec.
Per
rabbitmq.com/docs/web-stomp,
enabling the plugin: rabbitmq-plugins enable rabbitmq_web_stomp. The
plugin "listens on all interfaces on port 15674" at path /ws. Browser
clients connect as:
const ws = new WebSocket('ws://127.0.0.1:15674/ws');
const client = Stomp.over(ws);
client.connect('guest', 'guest', onConnect, onError, '/');
For server-side integration tests use the same STOMP TCP port (61613)
via the rabbitmq_stomp plugin, which per
rabbitmq.com/docs/stomp "ships
in the core distribution and handles STOMP 1.0 through 1.2." RabbitMQ
STOMP destination prefixes:
| Prefix | Meaning |
|---|---|
/queue/<name> | STOMP-managed durable queue |
/topic/<routing-key> | Topic exchange pub/sub |
/exchange/<name>/<routing-key> | Named exchange with routing key |
/amq/queue/<name> | Queue created outside the STOMP adapter |
/temp-queue/<name> | Auto-delete reply queue |
Per the RabbitMQ AMQP concepts guide, "messages are published to exchanges, which distribute message copies to queues using rules called bindings." The four exchange types:
| Type | Routing rule |
|---|---|
direct | Exact match on routing key per rabbitmq.com/docs/exchanges |
fanout | Copies to all bound queues; routing key ignored |
topic | * matches one dot-segment; # matches zero or more |
headers | Routes on message attribute map instead of routing key |
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(rabbit.getMappedPort(5672));
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// Declare a durable direct exchange and a durable queue
ch.exchangeDeclare("orders.direct", "direct", /*durable=*/true);
ch.queueDeclare("orders.eu", /*durable=*/true,
/*exclusive=*/false, /*autoDelete=*/false, null);
ch.queueBind("orders.eu", "orders.direct", "eu");
// Publish with delivery-mode=2 (persistent)
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentType("application/json")
.build();
ch.basicPublish("orders.direct", "eu", props,
"{\"id\":1}".getBytes(StandardCharsets.UTF_8));
// Consume and assert
GetResponse resp = ch.basicGet("orders.eu", /*autoAck=*/false);
assertThat(resp).isNotNull();
assertThat(new String(resp.getBody())).contains("\"id\":1");
ch.basicAck(resp.getEnvelope().getDeliveryTag(), /*multiple=*/false);
}
The Java client API is documented at rabbitmq.com/client-libraries/java-api-guide.
Per rabbitmq.com/docs/confirms:
basic.ack is used for positive acknowledgements.basic.nack is the RabbitMQ extension for negative acknowledgements
(supports multiple flag; basic.reject does not).basic.reject rejects a single message; set requeue=false to send
the message to a Dead Letter Exchange instead of back to the queue.// Prefetch = 1: broker sends at most 1 unacked message at a time
ch.basicQos(1);
boolean autoAck = false;
ch.basicConsume("orders.eu", autoAck, "consumer-tag",
new DefaultConsumer(ch) {
@Override
public void handleDelivery(String tag, Envelope env,
AMQP.BasicProperties props,
byte[] body) throws IOException {
try {
process(body);
ch.basicAck(env.getDeliveryTag(), /*multiple=*/false);
} catch (Exception e) {
// requeue=false routes to DLX
ch.basicNack(env.getDeliveryTag(),
/*multiple=*/false, /*requeue=*/false);
}
}
});
Per the confirms doc, "basic.qos sets the max number of unacknowledged
deliveries permitted on a channel; a value of zero means no limit."
Setting basicQos(1) is the recommended pattern for fair dispatch in
round-robin consumer pools.
Per rabbitmq.com/docs/publishers, publisher confirms "provide a mechanism for application developers to keep track of what messages have been successfully accepted by RabbitMQ." Enable on the channel then await confirmation:
ch.confirmSelect();
ch.basicPublish("orders.direct", "eu", null,
"ping".getBytes(StandardCharsets.UTF_8));
boolean acked = ch.waitForConfirms(5000 /*ms*/);
assertThat(acked).isTrue();
For throughput tests, use streaming confirms (asynchronous) rather than
waitForConfirms per message; the publishers doc warns that "waiting for
confirmation after each message causes a very significant negative effect
on throughput."
Send a message via STOMP (as a browser or Spring client would) and receive it via AMQP (as a backend service would), asserting the message survives the bridge:
// STOMP sender (port 61613 TCP or 15674 WS)
session.send("/exchange/orders.direct/eu",
new StompHeaders(), "order-99".getBytes());
// AMQP receiver - same queue that exchange routes to
Thread.sleep(200); // allow broker routing
GetResponse r = ch.basicGet("orders.eu", true /*autoAck*/);
assertThat(r).isNotNull();
assertThat(new String(r.getBody())).isEqualTo("order-99");
This test catches misconfigured exchange-queue bindings that unit tests on the STOMP layer alone would not reveal.
| Anti-pattern | Why it fails | Fix |
|---|---|---|
Test with ack=auto only | client / client-individual redelivery bugs ship silently | Step 3 covers ack mode matrix |
| Declare non-durable queues in integration tests | Broker restart drops queue; CI becomes flaky | Use durable=true in queueDeclare |
Use bare waitForConfirms() with no timeout | Hangs CI on unroutable messages | Pass a timeout ms value |
| Share one channel across threads | Per the Java API guide, channels are not thread-safe | One channel per thread |
Forget ch.basicQos in round-robin consumer | One slow consumer starves; others idle | Set basicQos(1) before basicConsume |
| Assert only STOMP without checking AMQP binding | Binding misconfiguration is invisible to STOMP layer | Use the bridge test in Step 7 |
@MessageMapping layer (SockJS + STOMP) adds session management
on top of raw STOMP; test it via StompClient in Spring's
spring-messaging test support, not raw TCP frames.RabbitMQContainer uses the official rabbitmq Docker
image; CI must have Docker available. Use @Container (static) for
test-class scope rather than per-test to keep suite time reasonable.mqtt-tests - QoS 0/1/2 for IoT/M2M MQTT stackswebsocket-tests - raw WebSocket frame testingnpx claudepluginhub testland/qa --plugin qa-realtime-protocolsProvides a checklist for code reviews covering functionality, security, performance, maintainability, tests, and quality. Use for pull requests, audits, team standards, and developer training.