From kafka-skills
Generates a ShadowTraffic configuration to populate a Kafka topic with realistic synthetic data by discovering schemas and cluster details from any attached Kafka MCP server.
How this skill is triggered — by the user, by Claude, or both
Slash command
/kafka-skills:kafka-shadowtraffic [required: topic name or keyword] [optional: environment name][required: topic name or keyword] [optional: environment name]This skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Generates a ShadowTraffic configuration that populates a Kafka topic with realistic synthetic data. The agent discovers everything — topic name, bootstrap servers, Schema Registry URL, key and value schemas and their serialization format — from the live cluster via whichever Kafka MCP server is attached. It then maps each schema field to the most semantically appropriate ShadowTraffic `_gen` fu...
Generates a ShadowTraffic configuration that populates a Kafka topic with realistic synthetic data. The agent discovers everything — topic name, bootstrap servers, Schema Registry URL, key and value schemas and their serialization format — from the live cluster via whichever Kafka MCP server is attached. It then maps each schema field to the most semantically appropriate ShadowTraffic _gen function and writes a ready-to-run config alongside an exact Docker command.
Target topic and environment: $ARGUMENTS
Open your first reply with: "Running the kafka-shadowtraffic skill to set up synthetic data generation."
Copy this checklist and track your progress:
ShadowTraffic Setup Progress:
- [ ] Step 1: Discover topic, schemas, and cluster details via the attached Kafka MCP
- [ ] Step 2: Hard gate - confirm with user before generating
- [ ] Step 3: Build the ShadowTraffic config
- [ ] Step 4: Write output files
- [ ] Step 5: Lint the config with ShadowTraffic's linter
- [ ] Step 6: Hand back with Docker run command
Read references/serializer-guide.md for the full list of serializer classes per format. The high-level discovery shape:
Identify the attached Kafka MCP server by looking at what's available in the session. Common ones: mcp__Lenses__* (Lenses MCP — reference implementation), mcp__Confluent__*, mcp__Aiven__*, custom servers tagged for Kafka.
Discover the environment / cluster using whichever tool the MCP exposes (Lenses: list_environments; Confluent: list_clusters; others vary).
Search for candidate topics by keyword from the user's prompt:
list_datasets(search=<keyword>) or list_topicslist_topics then filterFetch the key and value schemas for the chosen topic:
get_dataset or get_topic_metadata — returns schema format (AVRO, JSON, PROTOBUF, NONE) and schema body for both key and value subjectsget_schema(subject=<topic>-value) and get_schema(subject=<topic>-key)/subjects/<topic>-value/versions/latest and /subjects/<topic>-key/versions/latestFetch cluster connection details:
get_topic_metadata or list_environments; most MCPs expose this in cluster/environment metadata)Read partition count (Lenses: get_topic_metadata; most others: describe_topic or get_topic).
If no Kafka MCP is attached, or discovery returns nothing, fall back to the hard gate in Step 2 and ask the user for: topic name, bootstrap servers, Schema Registry URL (if any), key schema format + body, value schema format + body. Be explicit: "Without a Kafka MCP I can't verify the schema or cluster details — please supply them directly."
Expected output of Step 1, a recap like:
Discovered via <MCP server name>:
- Environment: staging
- Topic: orders.payment.completed
- Partitions: 12
- Bootstrap servers: localhost:9092
- Schema Registry: http://localhost:8081
- Key schema: String (no schema registration)
- Value schema: Avro — 6 fields (orderId, customerId, amount, currency, status, createdAt)
Mandatory. Do not write any file before sending one message that:
Then STOP and wait for the user's reply. The only way to skip the gate is if the user has already confirmed the recap earlier in this conversation.
Defaults applied once confirmed:
throttleMs: 500 (2 events per second — a safe rate that won't overwhelm a local cluster)shadowtraffic-config.json and license.env.exampleConsult references/serializer-guide.md for the full class name table. The key rule:
io.confluent.kafka.serializers.KafkaAvroSerializer for key and value; add schema.registry.url to producerConfigsio.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer for key and value; add schema.registry.urlio.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer for key and value; add schema.registry.urlorg.apache.kafka.common.serialization.StringSerializer for key; io.shadowtraffic.kafka.serdes.JsonSerializer for valueio.shadowtraffic.kafka.serdes.JsonSerializer for bothApply serializer selection independently to key and value — they can differ (e.g., String key + Avro value is common).
When a Confluent serializer is used and schema.registry.url is set, ShadowTraffic automatically downloads the registered schema from Schema Registry using TopicNameStrategy. You do not need to embed the schema inline — the generator's field structure just needs to match what the downloaded schema expects. ShadowTraffic validates and registers on write.
Only supply avroSchemaHint or jsonSchemaHint in localConfigs if:
For each field in the discovered schema, select the most semantically appropriate _gen. Consult references/gen-types.md for the full mapping table. Summary of common patterns:
| Field pattern | Recommended _gen |
|---|---|
*Id, *_id, id | {"_gen": "uuid"} |
*name, *Name | {"_gen": "string", "expr": "#{Name.fullName}"} |
*email* | {"_gen": "string", "expr": "#{Internet.emailAddress}"} |
*amount*, *price*, *cost* | {"_gen": "uniformDistribution", "bounds": [1, 1000], "decimals": 2} |
*status*, *state*, *type* with enum values | {"_gen": "oneOf", "choices": [...enum values from schema...]} |
*timestamp*, *createdAt*, *updatedAt* | {"_gen": "now"} |
| boolean | {"_gen": "boolean"} |
| integer count | {"_gen": "uniformDistribution", "bounds": [0, 100], "decimals": 0} |
| nested object | recurse — apply same rules to each nested field |
| array field | {"_gen": "repeatedly", "target": <element gen>, "times": {"_gen": "uniformDistribution", "bounds": [1, 5]}} |
When the schema has enum symbols (Avro "type": "enum" or JSON Schema "enum": [...]), always use oneOf with the actual allowed values extracted from the schema — never invent new ones.
When the schema has a null union (Avro ["null", "string"]), use weightedOneOf to generate null some fraction of the time:
{"_gen": "weightedOneOf", "choices": [{"weight": 1, "value": null}, {"weight": 9, "value": {"_gen": "string", "expr": "#{Lorem.word}"}}]}
{
"generators": [
{
"topic": "<topic-name>",
"connection": "kafka",
"key": <key-generator>,
"value": <value-generator>,
"localConfigs": {
"throttleMs": 500
}
}
],
"connections": {
"kafka": {
"kind": "kafka",
"producerConfigs": {
"bootstrap.servers": "<discovered-brokers>",
"key.serializer": "<selected-class>",
"value.serializer": "<selected-class>"
}
}
}
}
When Schema Registry is involved, add to producerConfigs:
"schema.registry.url": "<discovered-sr-url>"
When connecting to a secured cluster (Confluent Cloud, Aiven, MSK), add the appropriate SASL/SSL fields — see references/serializer-guide.md for complete examples.
Write exactly two files to the current working directory:
shadowtraffic-config.json — the complete ShadowTraffic configuration built in Step 3.
license.env.example — template for the required license environment variables:
LICENSE_ID=
LICENSE_EMAIL=
LICENSE_ORGANIZATION=
LICENSE_EDITION=
LICENSE_EXPIRATION=
LICENSE_SIGNATURE=
Do not write any other files. Do not write a README unless the user asks.
After writing shadowtraffic-config.json, run the ShadowTraffic linter against it to catch typos, unrecognized keys, and structural mistakes before handing back. ShadowTraffic uses open config parsing that silently ignores unknown fields — the linter makes those mistakes visible.
Run:
docker run --env-file license.env \
-v $(pwd)/shadowtraffic-config.json:/home/config.json \
shadowtraffic/shadowtraffic:latest \
--action lint \
--config /home/config.json
If license.env doesn't exist yet (the user hasn't set up their license), run the linter with placeholder env vars so it can still validate the config structure:
docker run \
-e LICENSE_ID=lint \
-e LICENSE_EMAIL=lint \
-e LICENSE_ORGANIZATION=lint \
-e LICENSE_EDITION=lint \
-e LICENSE_EXPIRATION=lint \
-e LICENSE_SIGNATURE=lint \
-v $(pwd)/shadowtraffic-config.json:/home/config.json \
shadowtraffic/shadowtraffic:latest \
--action lint \
--config /home/config.json
If the linter reports issues: fix shadowtraffic-config.json to address every finding — unrecognized generator keys, bad localConfigs fields, misplaced parameters — and re-run the linter until it passes clean. Do not hand back with lint errors outstanding.
If Docker is not available in the user's environment: note that linting was skipped and recommend the user run the lint command themselves before starting data generation.
Final message must include all of these:
One-line summary: what topic, what schema format, how many fields.
Files written: list shadowtraffic-config.json and license.env.example with their absolute paths.
Setup instructions:
# 1. Get a ShadowTraffic license at https://shadowtraffic.io and fill in your details:
cp license.env.example license.env
# (edit license.env with your license values)
# 2. Run ShadowTraffic:
docker run --net=host --env-file license.env \
-v $(pwd)/shadowtraffic-config.json:/home/config.json \
shadowtraffic/shadowtraffic:latest \
--config /home/config.json
Field summary: a brief table showing each schema field and the _gen chosen for it, so the user can spot anything that needs tweaking before running.
Tuning note: mention that throttleMs in localConfigs controls the rate (500ms = 2 events/sec); lower it for higher throughput or raise it to slow down.
Auto-schema note (if Schema Registry was used): "ShadowTraffic will automatically download the <topic>-value schema from Schema Registry and validate generated events against it."
references/test-cases.md)shadowtraffic-config.json passes the ShadowTraffic linter with zero findings before hand-backJsonSerializer for an Avro topic_gen choices are semantically appropriate — a customerId field gets uuid, not uniformDistributionUser says: "set up ShadowTraffic for the orders topic in staging"
Actions:
list_environments → pick staging. list_datasets(search="orders") → one match: orders.created. get_dataset returns Avro value schema with fields orderId (string), customerId (string), amount (double), status (enum: PENDING/CONFIRMED/SHIPPED), createdAt (long/timestamp). Bootstrap: localhost:9092. Schema Registry: http://localhost:8081.KafkaAvroSerializer for both key and value. schema.registry.url set. Generators: orderId → uuid, customerId → uuid, amount → uniformDistribution [1,500] decimals:2, status → oneOf ["PENDING","CONFIRMED","SHIPPED"], createdAt → now. Key is a plain uuid (string key, no key schema registered).shadowtraffic-config.json and license.env.example.Result: User copies the Docker command, runs it, and sees Avro events flowing into orders.created at 2/sec.
User says: "I need fake data flowing into my payments topic"
Actions:
payments.initiated with a JSON Schema value — fields: paymentId (string), userId (string), amount (number), currency (string, enum: USD/EUR/GBP), status (string).KafkaJsonSchemaSerializer for value. schema.registry.url set. Generators: paymentId → uuid, userId → uuid, amount → uniformDistribution [0.01, 5000] decimals:2, currency → oneOf ["USD","EUR","GBP"], status → oneOf ["INITIATED","PROCESSING","COMPLETED","FAILED"].User says: "set up ShadowTraffic to send test data to my Kafka topic"
Actions:
Result: Config is generated correctly; user is told "Without a Kafka MCP I couldn't verify these details against the live cluster — run kafka-schema-review after to confirm the generated schema matches what's registered."
Cause: Topic exists but nothing is registered in Schema Registry for it.
Solution: Fall back to io.shadowtraffic.kafka.serdes.JsonSerializer for both key and value (no Schema Registry needed). Ask the user to describe the expected shape of the messages so you can build generators for it. Note in the hand-back that the generated data won't be schema-validated.
Cause: Search returns more than one candidate.
Solution: List all candidates with their partition counts and schema formats, and ask the user to pick one. Do not guess.
Cause: The MCP doesn't expose cluster connection details, or the environment has restrictions.
Solution: Ask the user to supply the missing values directly. For bootstrap servers, the default for a local Lenses CE is localhost:9092; for Schema Registry, http://localhost:8081.
Cause: A _gen type name is misspelled, a localConfigs field doesn't exist, or a key was placed at the wrong nesting level.
Solution: Fix the flagged path in shadowtraffic-config.json and re-run the linter. Common mistakes: using start instead of startingFrom on sequentialInteger; putting throttleMs directly in the generator instead of inside localConfigs; misspelling weightedOneOf choices structure.
Cause: Subject doesn't exist in Schema Registry, or schema.registry.url is wrong.
Solution: Fetch the schema body via the MCP (or ask the user to paste it) and add avroSchemaHint / jsonSchemaHint to localConfigs. See references/serializer-guide.md for the exact inline schema syntax.
## ShadowTraffic config generated
Topic: <topic-name> | Format: <Avro/JSON Schema/plain>
### Files written
- shadowtraffic-config.json <full path>
- license.env.example <full path>
### Run it
# 1. Fill in your ShadowTraffic license (get one at https://shadowtraffic.io):
cp license.env.example license.env
# 2. Start generating data:
docker run --net=host --env-file license.env \
-v $(pwd)/shadowtraffic-config.json:/home/config.json \
shadowtraffic/shadowtraffic:latest \
--config /home/config.json
### What will be generated
| Field | Generator |
|-------------|--------------------------------------|
| orderId | uuid |
| customerId | uuid |
| amount | uniformDistribution [1, 500] |
| status | oneOf ["PENDING","CONFIRMED",...] |
| createdAt | now (current timestamp) |
Rate: 2 events/sec (throttleMs: 500). Adjust `throttleMs` in the config to change throughput.
Schema Registry: ShadowTraffic will auto-download the `<topic>-value` schema and validate all generated events against it.
### Lint result
Config passed the ShadowTraffic linter with zero findings.
npx claudepluginhub lensesio/agentic-engineering-for-apache-kafka --plugin kafka-skillsScaffolds a production-ready Python Kafka producer/consumer with confluent-kafka-python, Schema Registry, graceful shutdown, idempotent producer, and tests. Discovers topic and schema from any attached Kafka MCP server.
Use when the user wants to build a Python Kafka producer or consumer, add Schema Registry to existing Python code, migrate from raw JSON to schema-backed serialization, or scaffold a confluent-kafka-python project for Confluent Cloud or local Docker.
Creates and repairs Redpanda Connect pipeline configurations from natural language descriptions or broken configs. Delegates to component-search and bloblang-authoring skills.