Configures Spring Boot's Kafka starter via application.properties/yml or a generated KafkaConfiguration class. Useful when adding Kafka producer/consumer beans or before wiring KafkaTemplate and @KafkaListener.
How this skill is triggered — by the user, by Claude, or both
Slash command
/amplicode-spring-skills:kafka-configurationThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
This skill is part of the **Spring Agent Toolkit** and is designed to work with the **Spring MCP server** (provided by the Amplicode IntelliJ plugin). Before doing anything else, check your tool list for any Spring MCP tool — they are exposed under the `amplicode` MCP server (e.g. `get_project_summary`, `list_module_dependencies`, `list_application_properties_files`); harnesses that flatten MCP...
This skill is part of the Spring Agent Toolkit and is designed to work with the Spring MCP server (provided by the Amplicode IntelliJ plugin). Before doing anything else, check your tool list for any Spring MCP tool — they are exposed under the amplicode MCP server (e.g. get_project_summary, list_module_dependencies, list_application_properties_files); harnesses that flatten MCP tools into the tool list use the mcp__amplicode__ prefix on the same names.
amplicode-install skill (bundled with the Spring Agent Toolkit). It installs the Amplicode plugin and walks the user through the «Настроить Spring Agent» welcome-screen button + MCP-client restart. After it completes, the MCP tools become available — resume this skill.amplicode-install is not registered in your skill list, tell the user (in their language): "This skill needs the Amplicode IntelliJ plugin and its MCP server. Install it from https://amplicode.ru/marketplace into IntelliJ IDEA Ultimate/Community or GigaIDE, open any project, click «Настроить Spring Agent» on the Amplicode welcome screen, then restart your MCP client."Wires Spring Boot's Kafka starter through application.properties / application.yml and, optionally, a generated KafkaConfiguration class. Ensures the starter dependency is on the module classpath.
CRITICAL: Code ONLY from
examples/files. If no matching example — STOP and ask user. CRITICAL: For questions with a fixed set of choices, preferAskUserQuestion> its analogue > plain text list. CRITICAL: Read the conversation context BEFORE running Step 1. Half the questions in Steps 2–3 may already be answered.
serializerSource = properties (default). All six spring.kafka.* keys go into the property source: bootstrap-servers, consumer.group-id, producer key/value serializer, consumer key/value deserializer. KafkaAutoConfiguration creates ProducerFactory, ConsumerFactory, KafkaTemplate (bean kafkaTemplate), and kafkaListenerContainerFactory from them.serializerSource = @Configuration. Only bootstrap-servers + consumer.group-id go into the property source. A KafkaConfiguration class is generated with four beans: {prefix}ProducerFactory, {prefix}KafkaTemplate, {prefix}ConsumerFactory, kafkaListenerContainerFactory. {prefix} is the lowercased simple name of producerValueType. The listener factory bean name is fixed — autoconfig backs off via @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory"), any other name leaves the autoconfig factory in place and silently routes @KafkaListener to the wrong one.| Option | Default | Always ask? |
|---|---|---|
producerKeyType / producerValueType / consumerKeyType / consumerValueType | java.lang.String | YES — always ask producer key + value; consumer defaults to symmetric |
consumerGroup | — | YES (mandatory) |
bootstrapServers | localhost:9092 | NO |
serializerSource | properties | YES |
className | KafkaConfiguration | NO (only for Path B) |
packageName | mainPackage | NO (only for Path B) |
language / bootVersion | from get_project_summary | NO |
Smart defaults. If the user says "use defaults" / "all defaults" / "minimal configuration" → skip every Always ask = NO question. Only ask the mandatory ones (consumerGroup, serializerSource, and any type the user already mentioned non-default for).
Smart answer recognition. When the user provides a value directly ("for OrderEvent messages", "group orders"), accept it without asking again. Multiple answers in one message → accept all.
Batch questions. Group related questions into one AskUserQuestion call (up to 4 questions). Recommended option first, (Recommended) in its label.
For every input: try to derive from get_project_summary, list_module_dependencies, list_application_properties_files, prior turns, the user's prompt. Only ask when context yields no clear default.
language, bootVersion, mainPackage, bootstrapServers = localhost:9092, single-file propsFile.AskUserQuestion with recommended option first.consumerGroup, serializerSource, custom type FQNs.Re-read the user's prompt and prior turns; tick off everything already stated:
| Input | Signal in the prompt |
|---|---|
producerValueType / producerKeyType / consumerValueType / consumerKeyType | "publish X", "send X", "key as Y", "consume X", "@KafkaListener receives X". Consumer defaults to symmetric with producer. |
consumerGroup | "group X", "consumer group id …" |
serializerSource | "use properties", "in a @Configuration class", "properties only", "bean classes" |
className / packageName | "name it FooConfig", "in package …" |
language | Java / Kotlin / file extensions |
| smart defaults | "use defaults", "all defaults" → all types default to java.lang.String, only ask consumerGroup + serializerSource |
Tick → skip the corresponding question. Do not announce Step 0.
| Tool | Variable |
|---|---|
get_project_summary | language, bootVersion, mainPackage, buildFile, modules list |
list_module_dependencies(moduleName) | presentDeps |
list_application_properties_files(moduleName) | propsFiles |
list_spring_beans(moduleName) | existingBeans |
After the calls, Read each path in propsFiles and parse any existing spring.kafka.* keys into existingProps.
Multi-module selection. If modules has more than one entry:
list_module_dependencies + list_application_properties_files for each module in parallel.org.springframework.boot:spring-boot-starter-kafka; +1 if its property files contain any spring.kafka.* key.moduleName.Derived:
bootBranch — boot3 (Boot 3.0/3.1), boot3.2 (3.2/3.3/3.4/3.5), boot4 (≥4.0). Used in Step 5 + Step 6.kafkaAlreadyPresent — presentDeps contains org.springframework.boot:spring-boot-starter-kafka. Skip Step 5b if true.singlePropsFile — len(propsFiles) == 1. Skip the props-file question in Step 3 if true.existingBootstrapServers — value of spring.kafka.bootstrap-servers from existingProps if present, else null. Used silently in Step 2.existingConsumerGroup — value of spring.kafka.consumer.group-id from existingProps if present, else null.existingKafkaConfig — entries in existingBeans belonging to @Configuration classes that declare any of KafkaTemplate, ProducerFactory, ConsumerFactory, *ListenerContainerFactory beans. Each entry carries the class FQN, source file path, and the bean names already declared. Used in Step 4.Ask everything in a single AskUserQuestion call (up to 5 questions). Pre-fill defaults from context, skip already-answered:
Producer key type? — options: java.lang.String (Recommended), java.lang.Integer, java.lang.Long, java.util.UUID, java.lang.Void, Custom (specify FQN)Producer value type? — same optionsConsumer group id? — plain text, mandatory. If existingConsumerGroup is non-null → pre-fill silently.Bootstrap servers? — default localhost:9092. Если existingBootstrapServers is non-null → pre-fill silently.Where to describe serializers? — application.properties (Recommended) (Path A) / @Configuration class (Path B)Consumer types default to producer types (symmetric). Only ask separately if user indicated asymmetric.
If the user says "use defaults" / "all defaults" — skip type questions, default all four to java.lang.String. Only ask consumerGroup, bootstrapServers, serializerSource.
If existingKafkaConfig is non-empty, apply Decision principle 2 (one-line confirmation), naming the existing class:
Project already has Kafka beans in
{class.fqn}. Add new beans there? (Yes/No)
classTarget = existing. Reuse the existing class's packageName, className, and source file path. Step 5 then uses Edit to append beans, not Write to create a new file. Inserted bean names that collide with existing ones get a numeric suffix per Step 5 rules.existingKafkaConfig was empty) → classTarget = new:
className — default KafkaConfiguration; on file collision in {packageName}/{className}.{java|kt} append numeric suffix.packageName — default mainPackage; ask only if multiple @Configuration packages exist with no clear winner.Usually answered silently from context.
Always write:
spring.kafka.bootstrap-servers={bootstrapServers}
spring.kafka.consumer.group-id={consumerGroup}
If serializerSource = properties, also append the four (de)serializer keys (FQNs from examples/serializer-mapping.md):
spring.kafka.producer.key-serializer={producerKeySerializer}
spring.kafka.producer.value-serializer={producerValueSerializer}
spring.kafka.consumer.key-deserializer={consumerKeyDeserializer}
spring.kafka.consumer.value-deserializer={consumerValueDeserializer}
If consumerKeyType OR consumerValueType is a POJO, also append:
spring.kafka.consumer.properties[spring.json.trusted.packages]={packages}
{packages} = comma-separated packages of the POJO types on the consumer side.
If consumerKeyType is a POJO, also append:
spring.kafka.consumer.properties[spring.json.key.default.type]={consumerKeyTypeFqn}
If consumerValueType is a POJO, also append:
spring.kafka.consumer.properties[spring.json.value.default.type]={consumerValueTypeFqn}
YAML equivalents are in examples/serializer-mapping.md. Overwrite existing keys; never delete unrelated ones.
If kafkaAlreadyPresent = true, skip. Otherwise add org.springframework.boot:spring-boot-starter-kafka to buildFile (same artifact id for Boot 3.x and 4.x), then call refresh_build_system_model.
@Configuration (Path B only)examples/configuration-class/{language}-{bootBranch}.md.{prefix} = decapitalize({producerValueType} simple name) — String → string, OrderEvent → orderEvent, UUID → uUID (only first char lowered).existingBeans from Step 1 and, for classTarget = existing, against the target class's declared beans):
{prefix}ProducerFactory / {prefix}KafkaTemplate / {prefix}ConsumerFactory — if a bean with that exact name exists, append numeric suffix (stringProducerFactory_1, ...).kafkaListenerContainerFactory. If it already exists, use {prefix}KafkaListenerContainerFactory and warn in Step 6 that @KafkaListener for {producerValueType} must set containerFactory = "{prefix}KafkaListenerContainerFactory" explicitly.classTarget = existing → sample indentation from the target file.classTarget = new → .editorconfig → sample any existing source file → fallback 4-space.import line per unique FQN after package; skip java.lang and same-package classes. Java grouping: third-party block (sorted alphabetically) + blank line + java.* block (sorted). Kotlin: kotlin.* joins the third-party block.classTarget = new → Write the full file at {module sourceRoot}/{packageName as path}/{className}.{java|kt}.classTarget = existing → Read the target file, then Edit to append the four @Bean methods before the class's closing brace and merge required imports into the existing import block (no duplicates). Do not create a new file.Match the user's conversation language. Include:
spring.kafka.bootstrap-servers={existingBootstrapServers} / consumer.group-id={existingConsumerGroup} from {propsFile} — overwrote with the new values only if you provided them.»KafkaTemplate<{producerKeyType}, {producerValueType}> (bean kafkaTemplate for Path A, {prefix}KafkaTemplate for Path B). If the listener factory bean is named exactly kafkaListenerContainerFactory, @KafkaListener uses it by default and no explicit containerFactory is needed.@KafkaListener for {producerValueType} must set containerFactory = "{prefix}KafkaListenerContainerFactory" explicitly.examples/serializer-mapping.md rows matching each of the four type parameters — not from memory.bootstrap-servers + consumer.group-id are written regardless of serializerSource.@Configuration is generated.kafkaListenerContainerFactory (or {prefix}KafkaListenerContainerFactory only when the default was already taken by a previous generation).ConcurrentKafkaListenerContainerFactory<{consumerKeyType}, {consumerValueType}> — never raw.KafkaProperties import matches bootBranch: boot3 / boot3.2 use org.springframework.boot.autoconfigure.kafka; boot4 uses org.springframework.boot.kafka.autoconfigure.boot3.2 calls buildProducerProperties(sslBundles.getIfAvailable()) / buildConsumerProperties(sslBundles.getIfAvailable()); boot3 and boot4 pass no argument.spring.json.trusted.packages (Path A) or JsonDeserializer.TRUSTED_PACKAGES / JacksonJsonDeserializer.TRUSTED_PACKAGES (Path B) is set to the relevant package(s) — never *, never the FQN.consumerKeyType is a POJO: spring.json.key.default.type (Path A) or JsonDeserializer.KEY_DEFAULT_TYPE / JacksonJsonDeserializer.KEY_DEFAULT_TYPE (Path B) is set to the key type FQN.consumerValueType is a POJO: spring.json.value.default.type (Path A) or JsonDeserializer.VALUE_DEFAULT_TYPE / JacksonJsonDeserializer.VALUE_DEFAULT_TYPE (Path B) is set to the value type FQN.npx claudepluginhub amplicode/spring-skills --plugin spring-toolsScaffolds a production-ready Python Kafka producer/consumer with confluent-kafka-python, Schema Registry, graceful shutdown, idempotent producer, and tests. Discovers topic and schema from any attached Kafka MCP server.
Use when the user wants to build a Python Kafka producer or consumer, add Schema Registry to existing Python code, migrate from raw JSON to schema-backed serialization, or scaffold a confluent-kafka-python project for Confluent Cloud or local Docker.
Guides Kafka topic design (partitions, replication), KafkaJS idempotent producers/consumers, consumer lag monitoring, exactly-once semantics, schema registry, compacted topics, and DLQ patterns. Use for reliable streaming implementations.