From streamnative-agent-skills
Create and manage StreamNative Kafka instances and Kafka clusters across dedicated, dedicated-pro, BYOC, and BYOC Pro deployments. Use when the user asks to create/list/describe/delete a Kafka Instance or KafkaCluster, configure throughput units, cluster profiles, gateways, schema registry, mTLS, Kafka RBAC, or switch snctl service context to a KafkaCluster. For BYOC substrate (cloud connections, environments, volumes, pools) open byoc-infrastructure first.
How this skill is triggered — by the user, by Claude, or both
Slash command
/streamnative-agent-skills:kafka-clustersThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
- User asks to create, list, describe, or delete Kafka `Instance` or `KafkaCluster` resources.
Instance or KafkaCluster resources.kafkacluster, Kafka RTU / throughput unit, clusterProfile, Kafka gateways, Kafka mTLS, Kafka RBAC, or Kafka cluster Schema Registry endpoint.snctl context use / service-context setup for KafkaCluster data-plane commands.Prefer StreamNative Cloud remote MCP when available: use sncloud_clusters_read / sncloud_clusters_write for Instance, KafkaCluster, and controller-managed cluster resources visible in catalog; use sncloud_resource_catalog / sncloud_resource_schema before writes. Remote MCP is public preview and disabled by default; newly created clusters have cluster-level MCP disabled until user enables it in StreamNative Cloud UI, so data-plane follow-up through sncloud_context_use_cluster may not work immediately after create; when it does succeed, output MCP_TOOLS_REFRESH_REQUIRED instance=<instanceName> cluster=<clusterName> and stop so the runner/runtime can refresh tools before data-plane calls. See ../cloud-core/references/remote-mcp.md. Use snctl fallback when MCP tools are unavailable or for CLI-specific examples below.
Remote MCP route: start with sncloud_context_whoami, then catalog/schema for clusters domain. snctl route: run standard checks (snctl version --client, snctl auth whoami, snctl config get). Require snctl >= v1.7.0 for KafkaCluster-related workflows; upgrade before create/list/describe/delete/context actions. Defer auth/install failures to cloud-core.
KafkaCluster-specific control-plane prerequisites:
cloud.streamnative.io/instance-enabled=true).CloudFeature AuthV2=true).snctl create kafkacluster fails early if either feature is missing.byoc-infrastructure if missing.Two layered kinds:
PulsarInstance for KafkaCluster.Instance. Controller provisions Kafka brokers plus optional Kafka REST Proxy and Schema Registry endpoint.KafkaCluster important fields:
spec.instanceName — required parent Instance.spec.location — required location/region.spec.poolMemberRef — required by admission on create; snctl create may resolve and populate it from --location, but raw manifests must set both namespace and name.spec.throughputUnit — requested Kafka broker capacity in RTU; defaults to 1.spec.clusterProfile — latency-optimized (default in cloud-manager UI) or cost-optimized.spec.endpointAccess[].gateway — gateway names used to expose Kafka endpoints.spec.config.custom — Kafka broker config key/value overrides.spec.mtls.enabled — enable client-to-broker mTLS.spec.rbac.enabled — admission defaults this to true on create when omitted; set false via --disable-rbac / manifest when Kafka RBAC should be off.metadata.annotations.cloud.streamnative.io/enable-schema-registry: "true" — requests controller-managed SnSchemaRegistry + backend config.metadata.annotations.cloud.streamnative.io/schemaregistry — controller writes associated Schema Registry name, normally same as KafkaCluster.metadata.annotations.cloud.streamnative.io/auth: "v2" — controller-managed resources use AuthV2; service context expects AuthV2 token flow.Ask deployment type before create action. If unclear:
Supported Instance.spec.type for KafkaCluster: dedicated | dedicated-pro | byoc | byoc-pro.
For remote MCP writes, compose JSON-string manifests and call sncloud_clusters_write with operation=apply, dry_run=true; only repeat with dry_run=false after validation. Use sncloud_clusters_read for list/get. Never use generic sncloud_resources_*.
snctl create instance my-instance -n my-org \
--pool streamnative/shared-aws \
--type dedicated
snctl create kafkacluster my-kafka -n my-org \
--instance-name my-instance \
--location us-east-1 \
--throughput-unit 2 \
--cluster-profile latency-optimized
# snctl resolves --location to exact PoolMember and sends spec.poolMemberRef.
# Enable Schema Registry when using manifests. `snctl create kafkacluster` does not expose
# a schema-registry flag in every version; prefer manifest when endpoint is required.
Manifest path:
snctl apply -f ${CLAUDE_PLUGIN_ROOT}/skills/kafka-clusters/assets/manifests/instance-dedicated.yaml
snctl apply -f ${CLAUDE_PLUGIN_ROOT}/skills/kafka-clusters/assets/manifests/kafkacluster-dedicated.yaml
Prerequisite: run BYOC substrate workflows from byoc-infrastructure first.
snctl get cloudconnection -n my-org -o name
snctl get cloudenvironment -n my-org -o name
snctl get poolmember -n my-org
snctl create instance my-instance -n my-org \
--pool my-org/my-pool \
--type byoc
snctl create kafkacluster my-kafka -n my-org \
--instance-name my-instance \
--pool-member-name my-org/my-pool-member \
--throughput-unit 2 \
--cluster-profile latency-optimized
Use --type byoc-pro for BYOC Pro pools.
cloud-manager creates KafkaCluster with cloud.streamnative.io/enable-schema-registry: "true" so controller provisions same-named SnSchemaRegistry and records cloud.streamnative.io/schemaregistry on KafkaCluster.
snctl apply -f ${CLAUDE_PLUGIN_ROOT}/skills/kafka-clusters/assets/manifests/kafkacluster-dedicated.yaml
snctl get snschemaregistry -n my-org
snctl get snschemaregistry my-kafka-dedicated -n my-org -o yaml
Endpoint sources:
KafkaCluster.status.serviceEndpoints, prefer type: external.KafkaCluster.status.kafkaRestProxy.serviceEndpoint.SnSchemaRegistry.status.serviceEndpoints, prefer type: external.Direct KafkaCluster does not imply Kafka Connect endpoint; Kafka Connect remains PulsarCluster/KSN path unless backend adds separate support.
snctl get instance -n my-org
snctl get kafkacluster -n my-org
snctl get kafkacluster my-kafka -n my-org -o yaml
snctl describe kafkacluster my-kafka -n my-org
# Delete child before parent. Use --wait=false for async cloud teardown.
snctl delete kafkacluster my-kafka -n my-org --wait=false
snctl delete instance my-instance -n my-org --wait=false
snctl get kafkacluster my-kafka -n my-org --ignore-not-found
Preferred: edit manifest and snctl apply -f from source control.
Interactive edit:
snctl edit kafkacluster my-kafka
For config, prefer manifest because manual nested-map edits are easy to get wrong.
snctl get kafkacluster my-kafka -n my-org -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}'
snctl describe kafkacluster my-kafka -n my-org
snctl get kafkacluster my-kafka -n my-org -o jsonpath='{.status.serviceEndpoints}'
snctl get kafkacluster my-kafka -n my-org -o jsonpath='{.status.kafkaRestProxy.serviceEndpoint}'
Status endpoints:
status.serviceEndpoints[type=external].dnsName — preferred Kafka bootstrap endpoint.status.serviceEndpoints[type=internal].dnsName — fallback/internal bootstrap endpoint.status.kafkaRestProxy.serviceEndpoint — Kafka REST Proxy URL host.SnSchemaRegistry is usually named same as KafkaCluster. Trust annotation cloud.streamnative.io/schemaregistry first; fallback to same-name SnSchemaRegistry.Remote MCP route: use fixed Kafka cluster endpoint, or organization-level sncloud_context_available_clusters → sncloud_context_use_cluster; after a successful org-level cluster switch, output MCP_TOOLS_REFRESH_REQUIRED instance=<instanceName> cluster=<clusterName> and stop so the runner/runtime refreshes MCP tools; after resume, use feature-gated Kafka tools such as kafka_admin_topics_read/write, kafka_admin_groups_read/write, kafka_admin_sr_read/write, kafka_admin_connect_read/write, kafka_client_produce, and kafka_client_consume. Both require cluster-level MCP enabled for the target cluster in StreamNative Cloud UI; newly created clusters default to disabled.
snctl route: KafkaCluster context uses Instance/AuthV2 flow. It exchanges current user or selected service account grant, then writes Kafka client config.
snctl context use -n my-org --instance my-instance --kafka-cluster my-kafka
snctl kafka admin topics list
snctl kafka admin groups list
Important auth behavior:
token:<jwt>) for KafkaCluster contexts.snctl kafka connect ... can be unavailable for direct KafkaCluster context when no Kafka Connect URL exists; use PulsarCluster/KSN context for Connect.Standard rules from cloud-core apply. KafkaCluster-specific:
metadata.name / CLI cluster name for KafkaCluster must be 20 characters or fewer. Longer names fail server-side with name is longer than 20 characters.--instance-name required.--location or --pool-member-name required for snctl create; direct manifests must include spec.poolMemberRef.name and spec.poolMemberRef.namespace.--throughput-unit must be >= 1; default 1.--cluster-profile cost-optimized|latency-optimized.--gateways <name> repeatable.--config key=value repeatable; becomes spec.config.custom.--catalog <name> associates one catalog for compaction/lakehouse workflows.--mtls-enabled sets spec.mtls.enabled=true.--disable-rbac sets spec.rbac.enabled=false; leaving it unset enables RBAC because admission defaults it to true on create.cloud.streamnative.io/enable-schema-registry: "true"); use manifest if current snctl create kafkacluster --help lacks a flag.sncloud_clusters_write manifest is JSON string, not YAML/object. Omit status and read-only metadata copied from read output.sncloud_context_use_cluster or Kafka data-plane MCP tools on that cluster. After sncloud_context_use_cluster succeeds, emit MCP_TOOLS_REFRESH_REQUIRED instance=<instanceName> cluster=<clusterName> and wait for runner/runtime refresh because selected cluster context changes the session-scoped tool surface. ToolSearch is not refresh.KafkaCluster names must be <=20 characters; use short smoke-test names.PulsarInstance.--pool-member-name must belong to the Instance pool; snctl rejects mismatched pool member vs Instance.spec.poolRef.--location must resolve to exactly one matching pool member. Ambiguous location requires explicit --pool-member-name.throughputUnit is RTU capacity, not broker replica count. Controller translates RTU to resources unless backend config overrides.clusterProfile only accepts cost-optimized or latency-optimized.snschemaregistry if endpoint missing.KafkaCluster.status.serviceEndpoints; cloud-manager may expose confusing generated fields, but authoritative source is SnSchemaRegistry.status.serviceEndpoints.KafkaConnectUrl is expected.snctl delete defaults to --wait=true and can block during cloud teardown. Use --wait=false, then poll.npx claudepluginhub streamnative/streamnative-agent-skills --plugin streamnative-agent-skillsSearches MemPalace before answering questions about past work, people, projects, or prior decisions. Returns verbatim stored content instead of guessing from model memory.
Guides Payload CMS config (payload.config.ts), collections, fields, hooks, access control, APIs. Debugs validation errors, security, relationships, queries, transactions, hook behavior.
Implements vector databases with Pinecone, Weaviate, Qdrant, Milvus, pgvector for semantic search, RAG, recommendations, and similarity systems. Optimizes embeddings, indexing, and hybrid search.