Use when a GlassFlow pipeline is experiencing backpressure, sustained sink lag, or throughput pain. Reads GlassFlow's emitted Prometheus metrics to confirm the bottleneck (backpressure gauges, stream depth ratio, source vs sink rate, sink batch size distribution), proposes targeted changes to replicas, batch sizes, stream limits, or dedup storage, applies them via the appropriate API endpoint, and verifies improvement by re-reading the same metrics. Triggers on phrases like "pipeline has backpressure", "tune pipeline X", "throughput is too low", "size resources for pipeline Y".
How this skill is triggered — by the user, by Claude, or both
Slash command
/glassflow-agent-skills:tune-pipelineThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Move a pipeline from "barely keeping up" to "comfortably keeping up" without overprovisioning. Confirm the bottleneck from metrics, propose a specific change tied to a measured signal, apply it via the right endpoint, then re-read the metrics to verify the change helped.
Move a pipeline from "barely keeping up" to "comfortably keeping up" without overprovisioning. Confirm the bottleneck from metrics, propose a specific change tied to a measured signal, apply it via the right endpoint, then re-read the metrics to verify the change helped.
http://localhost:8081; ask the user if remote.<prometheus>/api/v1/query and /api/v1/query_range./metrics endpoint at {release}-otel-collector.{namespace}.svc.cluster.local:9090/metrics.pipeline_id: required.prometheus_endpoint: required.api_endpoint: optional; default http://localhost:8081.metric_namespace: optional; defaults to glassflow.Snapshot the current state of both the configuration and the metrics that describe the symptom. You need a clean baseline so step 6 (verify) has something to compare against.
ENDPOINT="<api-endpoint>"
PID="<id>"
NS="<metric_namespace>"
PROM="<prometheus_endpoint>"
# Config + resources
curl -s "$ENDPOINT/api/v1/pipeline/$PID" > /tmp/tune-$PID-pipeline.json
curl -s "$ENDPOINT/api/v1/pipeline/$PID/resources" > /tmp/tune-$PID-resources.json
curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -m json.tool
# Save the symptom metrics as a baseline (run all of these)
{
echo "=== source rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_kafka_records_read_total{pipeline_id=\"$PID\"}[5m])"
echo "=== sink rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_clickhouse_records_written_total{pipeline_id=\"$PID\"}[5m])"
echo "=== ingestor backpressure active ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_ingestor_backpressure_active{pipeline_id=\"$PID\"}"
echo "=== stream depth ratio ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_stream_depth_ratio{pipeline_id=\"$PID\"}"
echo "=== sink batch size records (p50, p95) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.5, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.95, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
echo "=== sink retries (outcome) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=sum by (outcome) (rate(${NS}_gfm_sink_retries_total{pipeline_id=\"$PID\"}[5m]))"
} > /tmp/tune-$PID-before.txt
The pipeline must be Running for metrics to reflect live state. If it is Failed or Stopped, use the debug-pipeline skill first.
Each row below names the canonical metric signature and the matching config knob. Pick the row that matches the user's symptom; do not propose changes without a matching metric.
| Metric signature | Likely bottleneck | Tunable |
|---|---|---|
ingestor_backpressure_active == 1 sustained, stream_depth_ratio near 1.0 | Downstream consumer slower than the source; NATS stream filling | Scale the slowest downstream component (often sink replicas) or grow the stream (resources.nats.stream.max_messages / .max_bytes) |
kafka_records_read_total rate growing but consumer-group lag also growing, ingestor CPU low | Ingestor parallelism capped by partition count | Increase Kafka topic partitions (Kafka-side change, outside GlassFlow), then increase resources.sources[].replicas |
kafka_records_read_total rate flat near zero with non-zero topic lag | Ingestor under-provisioned per replica | Raise resources.sources[].limits.cpu and .memory |
clickhouse_records_written_total rate consistently below source rate; sink_batch_size_records p95 hitting the configured max_batch_size | Sink throttled by batch frequency / size | Tune sink.max_batch_size upward and/or sink.max_delay_time downward via /edit, or scale resources.sink.replicas |
sink_retries_total{outcome="retry"} rate > 0 sustained | Sink hitting retryable CH errors (overload, connection drops) | Raise resources.sink.replicas; check ClickHouse capacity separately |
sink_retries_total{outcome="exhausted"} rate > 0 | Sink giving up on batches; data loss risk | Out of tuning scope: surface to the user and run debug-pipeline to find the root cause |
processing_duration_seconds bucket high on stage=dedup_* with growing PVC | Dedup window too long for storage budget | Shrink dedup time_window (via /edit) or grow resources.transform[].storage.size |
sink_errors_by_classification_total{classification="terminal"} > 0 | Schema or type mismatch on the destination table | Out of tuning scope: this is a config problem, not a sizing problem |
If no row matches, do not guess. Re-run debug-pipeline or ask the user to narrow the signal.
Build a specific change with rationale tied to a measured value. Example to show the user:
Bottleneck: sink rate is 5,200 events/sec while source rate is 14,800 events/sec (gap = 9,600).
sink_batch_size_recordsp95 is at 5,000 (matches the configuredmax_batch_size), so the sink is batch-bound, not throughput-bound on individual inserts.Proposed change:
sink.max_batch_size: 5000 -> 20000 (let the sink form larger batches; reduces per-insert overhead)sink.max_delay_time: 5s -> 2s (flush sooner so larger batches do not wait too long)resources.sink.replicas: 1 -> 2 (parallelize inserts; ClickHouse accepts concurrent INSERTs to the same table)Expected effect: sink rate climbs to >= 14,000 events/sec within ~5 minutes,
stream_depth_ratiofalls below 0.5,ingestor_backpressure_activereturns to 0.Apply via: stop pipeline, PUT /resources for the replica change, POST /edit for the batch tuning, restart.
Wait for the user to confirm before submitting any change.
Choose the endpoint by what is changing.
Resource changes (replicas, requests/limits, storage, NATS stream sizing): require the pipeline to be Stopped, then PUT /api/v1/pipeline/<id>/resources.
# Stop
curl -s -o /tmp/tune-stop.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/stop"
# Wait for Stopped
until [ "$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status'))")" = "Stopped" ]; do sleep 2; done
# Apply resource diff (full PipelineResources body required)
curl -s -o /tmp/tune-put.json -w "%{http_code}\n" -X PUT "$ENDPOINT/api/v1/pipeline/$PID/resources" \
-H 'Content-Type: application/json' \
--data @/tmp/tune-$PID-new-resources.json
# Resume
curl -s -o /tmp/tune-resume.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/resume"
Config changes (batch size, delay, dedup window, transforms, filter): use POST /api/v1/pipeline/<id>/edit with the full new pipeline JSON. /edit applies the config and resumes the pipeline in one call.
curl -s -o /tmp/tune-edit.json -w "%{http_code}\n" -X POST "$ENDPOINT/api/v1/pipeline/$PID/edit" \
-H 'Content-Type: application/json' \
--data @/tmp/tune-$PID-new-pipeline.json
Mixed changes (resources + config) require both calls in sequence: stop, PUT /resources, then /edit (which resumes) or /resume.
Running againfor i in {1..60}; do
STATUS=$(curl -s "$ENDPOINT/api/v1/pipeline/$PID/health" | python3 -c "import sys,json; print(json.load(sys.stdin).get('overall_status','?'))")
echo " attempt $i: overall_status=$STATUS"
case "$STATUS" in
Running) echo " ready"; break ;;
Failed) echo " pipeline failed after change; revert"; exit 1 ;;
Stopping|Stopped|Terminating) echo " pipeline did not resume; investigate"; exit 1 ;;
esac
sleep 5
done
Wait at least 5 minutes after the pipeline returns to Running so the rate windows refill, then re-pull the same baseline queries from step 1:
{
echo "=== source rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_kafka_records_read_total{pipeline_id=\"$PID\"}[5m])"
echo "=== sink rate (events/sec, 5m) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=rate(${NS}_gfm_clickhouse_records_written_total{pipeline_id=\"$PID\"}[5m])"
echo "=== ingestor backpressure active ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_ingestor_backpressure_active{pipeline_id=\"$PID\"}"
echo "=== stream depth ratio ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=${NS}_gfm_stream_depth_ratio{pipeline_id=\"$PID\"}"
echo "=== sink batch size records (p50, p95) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.5, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
curl -sG "$PROM/api/v1/query" --data-urlencode "query=histogram_quantile(0.95, sum by (le) (rate(${NS}_gfm_sink_batch_size_records_bucket{pipeline_id=\"$PID\"}[5m])))"
echo "=== sink retries (outcome) ==="
curl -sG "$PROM/api/v1/query" --data-urlencode "query=sum by (outcome) (rate(${NS}_gfm_sink_retries_total{pipeline_id=\"$PID\"}[5m]))"
} > /tmp/tune-$PID-after.txt
Compare before and after, then report:
Change applied: <diff summary>
Observation window: 5 minutes
Symptom metric: <metric name>
Before: <value>
After: <value>
Other signals:
source rate: <before> -> <after>
sink rate: <before> -> <after>
stream_depth_ratio: <before> -> <after>
ingestor BP active: <before> -> <after>
Verdict: <improved | no change | regressed>
Next move: <accept | revert | try a different tunable>
If the metric improved as predicted, accept. If it did not, revert the change before trying anything else. Stacking unverified changes hides which one helped or hurt.
resources.nats.stream.max_age and .max_bytes are immutable after pipeline creation. Changing them requires a new pipeline. Other NATS-stream fields are mutable. Surface this constraint when the proposed change involves these fields.Stopped. PUT /resources returns 409 with a current_status error if you forget. Always stop first./edit requires Running (or the appropriate transition target). It is not a universal "apply config" endpoint; it has lifecycle preconditions.sink.max_batch_size shifts memory pressure to the sink Pods. If the sink limit is memory: 512Mi, a 50k batch with heavy rows may OOM. Raise limits.memory alongside.rate() over the same window for before/after comparison. Comparing raw counter values across the change is meaningless.{namespace} in metric names is the Helm release namespace, not the per-pipeline one. A non-default install changes every metric name.npx claudepluginhub glassflow/agent-skills --plugin glassflow-agent-skillsCreates, edits, and optimizes skills for Claude Code, including drafting, evaluating with test prompts, iterating on performance, and improving skill descriptions for better triggering accuracy.