Use when the user asks to create a GlassFlow ETL pipeline. Walks through source configuration (Kafka or OTLP), optional dedup/filter/transform stages, and ClickHouse sink mapping, then submits the v3 config to the GlassFlow API. Triggers on phrases like "create a pipeline", "set up a Kafka to ClickHouse pipeline", "ingest logs from OTLP into ClickHouse".
How this skill is triggered — by the user, by Claude, or both
Slash command
/glassflow-agent-skills:create-pipelineThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Turn the user's intent into a [v3 pipeline config](https://docs.glassflow.dev/configuration/pipeline-config-reference) and submit it to the GlassFlow API at `POST /api/v1/pipeline`.
Turn the user's intent into a v3 pipeline config and submit it to the GlassFlow API at POST /api/v1/pipeline.
http://localhost:8081.Ask the user upfront. Do not invent values, especially credentials or table names.
Pipeline identity
pipeline_id: lowercase letters/numbers/hyphens, 5-40 chars, starts with a letter, ends with a letter or number, no consecutive hyphens. Example: orders-prod.name: human-readable label.Source (exactly one in v1 of this skill)
kafka or otlp.logs or otlp.metrics.brokers[], protocol, mechanism (if SASL), username, password, topic, schema_fields[] (each {name, type}; type is string | int | float | bool | array | map).source_id. Schema is fixed by signal type.Optional transforms (any subset, applied in order)
dedup: key field + time window (e.g. 30s, 1m, 1h).filter: an expr-lang expression returning bool. Events where the expression is TRUE are KEPT.stateless: array of {expression, output_name, output_type} entries.Sink (ClickHouse)
host, port (default 9000), http_port (default 8123), database, username, password, secure (bool).table name.mapping[]: each entry {name, column_name, column_type}. name references either a source schema_fields entry or a stateless transform output_name.max_batch_size (default 1000), max_delay_time (default 60s).Skeleton:
{
"version": "v3",
"pipeline_id": "<id>",
"name": "<name>",
"sources": [{"type": "kafka", "source_id": "...", "topic": "...", "connection_params": {...}, "schema_fields": [...]}],
"transforms": [],
"sink": {
"type": "clickhouse",
"connection_params": {"host": "...", "port": "9000", "http_port": "8123", "database": "...", "username": "...", "password": "...", "secure": false},
"table": "...",
"max_batch_size": 1000,
"max_delay_time": "5s",
"mapping": [{"name": "...", "column_name": "...", "column_type": "..."}]
}
}
Write it to /tmp/pipeline-<pipeline_id>.json and cat it back so the user sees what will be submitted.
Always confirm the destination before submitting. Example:
"About to create pipeline
orders-prodathttp://localhost:8081. Sink writes toanalytics.orders. Proceed?"
Wait for explicit yes.
curl -s -o /tmp/pipeline-resp.json -w "%{http_code}\n" -X POST \
"<api-endpoint>/api/v1/pipeline" \
-H 'Content-Type: application/json' \
--data @/tmp/pipeline-<id>.json
cat /tmp/pipeline-resp.json
Expected: HTTP 200 with body {}.
| HTTP | Likely cause | Next move |
|---|---|---|
| 400 / 422 | API rejected the config. Body contains a specific error path. | Read the error, identify the offending field, propose a fix, ask the user before resubmitting. |
| 409 | Pipeline ID already exists. | Pick a different ID, or DELETE /api/v1/pipeline/<id> first if the user confirms removal is OK. |
| 500 | Server-side issue. | Capture the body verbatim, show the user, do not retry silently. |
| 000 / timeout | Network or unreachable API. | Ask the user to confirm endpoint and any port-forward state. |
RunningA successful POST means the config was accepted, not that the pipeline is ready to process events. The operator reconciles the workloads in the background; the pipeline only ingests once overall_status reaches Running. Poll the health endpoint until then:
ENDPOINT="<api-endpoint>"
PID="<id>"
for i in {1..60}; do
STATUS=$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status','?'))")
echo " attempt $i: overall_status=$STATUS"
case "$STATUS" in
Running) echo " ready"; break ;;
Failed) echo " pipeline failed; capture the health body and surface to the user"; exit 1 ;;
Stopping|Stopped|Terminating) echo " pipeline is being torn down; aborting wait"; exit 1 ;;
esac
sleep 5
done
Typical transition: Created immediately after POST, then Running within 10-60 seconds once the operator has stamped the workloads. If the status stays at Created for more than ~2 minutes, surface the full health body to the user; the most common causes are missing K8s resources or an image-pull issue on the data-plane Pods.
Note: Running reflects control-plane state, not data flow. The components are up; whether events are actually moving needs the verify step below.
curl -s "<api-endpoint>/api/v1/pipeline/<id>" | head -c 500
Confirm the pipeline shape is what you submitted. Then exercise it:
send-kafka-messages skill if available).<otlp-endpoint>/v1/logs (or /v1/metrics) with header x-glassflow-pipeline-id: <id>.Query ClickHouse to confirm rows landed:
SELECT count(*), max(<timestamp_column>) FROM <db>.<table>
^[a-z][a-z0-9-]{3,38}[a-z0-9]$, no --.x-glassflow-pipeline-id header on every request. The receiver returns 400 without it, and a 5xx with a retry hang on an unknown ID.compression: none on the otlp_http exporter.When the pipeline is created successfully, report:
/tmp/pipeline-<id>.json).When the API rejects, surface the error verbatim and offer one fix proposal.
npx claudepluginhub glassflow/agent-skills --plugin glassflow-agent-skillsCreates, edits, and optimizes skills for Claude Code, including drafting, evaluating with test prompts, iterating on performance, and improving skill descriptions for better triggering accuracy.