From streamnative-agent-skills
Administer the Kafka protocol on StreamNative Cloud via `snctl kafka` — topics, consumer groups, partitions, schemas (Schema Registry), and Kafka Connect connectors; produce/consume Kafka messages. Use when the user mentions Kafka, Kafka topics/groups/partitions, Schema Registry, or Kafka Connect connectors. For Pulsar-native administration use pulsar-admin.
How this skill is triggered — by the user, by Claude, or both
Slash command
/streamnative-agent-skills:kafka-adminThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
- User mentions Kafka, Kafka topics, Kafka consumer groups.
Prefer StreamNative Cloud remote MCP when available: use fixed Kafka cluster endpoint or organization-level sncloud_context_use_cluster; after successful org-level selection, output MCP_TOOLS_REFRESH_REQUIRED instance=<instanceName> cluster=<clusterName> and stop so the runner/runtime can refresh MCP tools; after resume, use feature-gated Kafka tools (kafka_admin_*_read, kafka_admin_*_write, kafka_client_*). Remote MCP is public preview and disabled by default; cluster-level MCP must be enabled in StreamNative Cloud UI before fixed endpoints or sncloud_context_use_cluster can target a cluster. See ../cloud-core/references/remote-mcp.md. Use snctl fallback when MCP tools are unavailable or for CLI-specific examples below.
Remote MCP route: verify sncloud_context_whoami; for org-level session select cluster via sncloud_context_available_clusters / sncloud_context_use_cluster. After a successful sncloud_context_use_cluster, output exactly MCP_TOOLS_REFRESH_REQUIRED instance=<instanceName> cluster=<clusterName> and stop before using Kafka tools; ToolSearch is not a refresh substitute. If sncloud_context_use_cluster reports cluster MCP disabled/not enabled, ask user to enable cluster-level MCP in StreamNative Cloud UI or use snctl. snctl route: standard checks plus:
snctl describe pulsarcluster <name> -n <org> — look for Kafka-related status fields/annotations.identity).On StreamNative Cloud, Kafka is implemented by KoP (Kafka-on-Pulsar). Kafka primitives map 1:1 to Pulsar primitives:
| Kafka concept | Pulsar implementation |
|---|---|
| Topic | Persistent topic in a dedicated kafka/ namespace |
| Consumer group | Pulsar subscription (Failover type) |
| Schema Registry | Per-topic Pulsar schema storage, exposed via the KoP Schema Registry REST API |
| Kafka Connect | Connector runtime hosted alongside the Kafka protocol |
Because of this mapping, Kafka ACLs are enforced via the underlying Pulsar role-based model.
Remote MCP equivalents use documented feature-gated split tool names when enabled: kafka_admin_topics_read/write, kafka_admin_groups_read/write, kafka_admin_sr_read/write, kafka_admin_connect_read/write, kafka_admin_partitions_write, plus kafka_client_produce and kafka_client_consume. Use _read tools for list/get/status/describe and _write tools for create/update/delete/reset/compatibility/partition operations. Do not invent sncloud_resources_*, old unsplit admin names, or undeclared admin tool names.
snctl kafka admin topics create my-topic --partitions 3 --replication-factor 3
snctl kafka admin topics list
snctl kafka admin topics describe my-topic
snctl kafka admin groups list
snctl kafka admin groups describe my-group
# Reset group offsets. Without --execute, reset-offsets is a dry run.
snctl kafka admin groups reset-offsets my-group \
--topic my-topic --oldest --execute
snctl kafka admin groups reset-offsets my-group \
--topic my-topic --newest --execute
snctl kafka admin groups reset-offsets my-group \
--topic my-topic --offset 1000 --execute
snctl kafka admin groups delete my-group
snctl kafka admin schemas create my-topic-value \
-f ./schema.avsc -t AVRO
snctl kafka admin schemas get my-topic-value
snctl kafka admin schemas list
snctl kafka admin schemas delete subject my-topic-value -f
snctl kafka connect list
snctl kafka connect create --name my-connector --config ./connector.json
snctl kafka connect status my-connector
snctl kafka connect restart my-connector
snctl kafka connect pause my-connector
snctl kafka connect resume my-connector
snctl kafka connect delete my-connector
snctl kafka client produce --topic my-topic --message "hello" --num-times 10
snctl kafka client consume --topic my-topic --group cli --num-messages 10 --timeout 60s
# The bootstrap server URL is on the cluster status
snctl describe pulsarcluster <cluster-name> -n <org> -o yaml | grep -A1 kafka
snctl.sncloud_context_use_cluster works only when cluster-level MCP is enabled on the target cluster; new clusters default to disabled until enabled in StreamNative Cloud UI. After it succeeds, emit MCP_TOOLS_REFRESH_REQUIRED instance=<instanceName> cluster=<clusterName> and wait for runner/runtime refresh because selected cluster context changes the session-scoped tool surface. Do not use ToolSearch as refresh.--replication-factor on KoP is accepted but physical replication is governed by the underlying Pulsar cluster, not the Kafka config. Setting replication-factor: 3 on a cluster with bookkeeper.replicas: 1 is meaningless.BACKWARD; switching to NONE disables compatibility checks and allows breaking changes.snctl kafka connect connector configs are JSON (not YAML) — mirror the upstream Kafka Connect REST API config shape.--from-beginning reads can hang or leave the group briefly Stable after forced process stops on KoP-backed clusters.npx claudepluginhub streamnative/streamnative-agent-skills --plugin streamnative-agent-skillsSearches MemPalace before answering questions about past work, people, projects, or prior decisions. Returns verbatim stored content instead of guessing from model memory.
Guides Payload CMS config (payload.config.ts), collections, fields, hooks, access control, APIs. Debugs validation errors, security, relationships, queries, transactions, hook behavior.
Implements vector databases with Pinecone, Weaviate, Qdrant, Milvus, pgvector for semantic search, RAG, recommendations, and similarity systems. Optimizes embeddings, indexing, and hybrid search.