From dp
DP pipeline incident co-pilot. Given a DAG name or ECS task ARN, pulls logs, classifies the failure, and drafts Slack + postmortem text. Default account: edg-dp-dev (388892568521).
How this skill is triggered — by the user, by Claude, or both
Slash command
/dp:incidentThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
When a DAG fails or an ECS Fargate task errors out, this skill compresses the multi-tool investigation into one pass: pull the DAG code, fetch the ECS task logs, classify the failure, and draft a Slack message + postmortem skeleton the engineer can copy.
When a DAG fails or an ECS Fargate task errors out, this skill compresses the multi-tool investigation into one pass: pull the DAG code, fetch the ECS task logs, classify the failure, and draft a Slack message + postmortem skeleton the engineer can copy.
aws directly.[OK], [WARN], [FAIL], [ERROR]). The bundled scripts already set PYTHONUTF8=1.The skill accepts one of:
| Input shape | Detection | Example |
|---|---|---|
| DAG name | matches ^[a-z][a-z0-9_]*$ and not a 32-char hex | data_pier_six_db, dbt_run_events |
| ECS task ARN (full) | starts with arn:aws:ecs: | arn:aws:ecs:us-west-2:388892568521:task/cluster/e5d52f30... |
| ECS task ID (short) | 32 lowercase hex chars | e5d52f3015724d52bb2f0f72a81acd15 |
cluster/task-id form | contains / | dbt-runner/e5d52f30... |
If the input is ambiguous, ask the user to confirm: "Is <input> a DAG name or an ECS task id?"
The skill orchestrates four phases. Each phase calls into bundled lib code; the skill itself coordinates and synthesizes.
If input is a DAG name:
lib/dag_resolver.py:
DPS_AIRFLOW_ROOT env var is set, look for <DPS_AIRFLOW_ROOT>/dags/<name>.py (and dags/**/<name>.py).ImagineLearning/DataPlatformAirflowDags for the DAG name.schedule_interval (or schedule)default_args (especially on_failure_callback)EcsRunTaskOperator task definitions: their task_id, cluster, task_definition, container_overrides (look for command arrays — these often name the dbt model or script being run)aws_conn_id if explicitdag_path for the postmortem.If input is a task ARN/ID:
dbt-runner-events) often maps 1:1 to a DAG. Search DPS_AIRFLOW_ROOT/dags/ (or GitHub) for task_definition_family references.dag_path and read the DAG file as in the DAG-name path above.Invoke the bundled lib/ecs_log_fetcher.py CLI to pull and classify the task's CloudWatch logs:
python -c "import os,sys; from pathlib import Path; r=os.environ.get('CLAUDE_PLUGIN_ROOT'); p=(Path(r)/'lib/ecs_log_fetcher.py') if r and (Path(r)/'lib/ecs_log_fetcher.py').exists() else (sorted(Path.home().glob('.claude/plugins/cache/dp-marketplace/dp/*/lib/ecs_log_fetcher.py')) or sorted(Path.home().glob('.claude/plugins/cache/*/dp/*/lib/ecs_log_fetcher.py')))[-1]; sys.exit(__import__('subprocess').call([sys.executable, str(p), '<task-id>', '--summary-only'] + (['--region','us-west-2'])))"
Capture:
If logs can't be retrieved (>1h since stop with no fallback hit, etc.), record the error and continue — the rest of the synthesis is still useful.
SSO auto-login. When the boto3 calls hit an expired SSO session, the fetcher automatically runs aws sso login --profile <profile> (browser opens) and retries the AWS calls once. The engineer doesn't need to log in manually before invoking the command. If auto-login fails (no aws CLI, browser approval declined), the fetcher reports the failure and the skill stops with the manual command shown.
The bundled fetcher already runs failure classification. Use its output verbatim. The catalog (mirrored from lib/ecs_log_fetcher.py's _FAILURE_PATTERNS):
| Pattern | Signal | Suggested fix |
|---|---|---|
| OOM kill | OutOfMemory, MemoryError, OOMKilled, exit code 137, ResourceInitializationError | Bump task definition memory / memoryReservation. |
| Image pull failure | CannotPullContainerError, manifest unknown, image not found | Verify ECR tag exists; task execution role has ecr:BatchGetImage. |
| IAM / credentials | AccessDeniedException, NoCredentialsError, not authorized to perform, 403 | Inspect the task role (not execution role). |
| Env var missing | KeyError: '<var>', ValueError: ... environment | Add the key to the task definition's environment or secrets. |
| Network / DNS | Connection timed out, EndpointConnectionError, Could not resolve host | Check VPC subnets, NAT gateway, security groups. |
| Snowflake referential issue | does not exist or not authorized, 002003 (42S02) | Confirm the object exists and the role has grants in the target env. |
| Snowflake SQL compile error | 001003 (42000), SQL compilation error | Inspect compiled SQL at the cited line; common: stray ; inside a Jinja block. |
| dbt run completed with errors | Done. PASS=N WARN=N ERROR=>0 SKIP=N | Each Database Error in model <name> block above names the failing model. |
| Generic Python crash | Traceback (most recent call last): | Surface the bottom-most exception. |
| Exit 0, no errors | stop_code=EssentialContainerExited and no errors | No fix; report runtime. |
| UNKNOWN | None match | Print last 15 log lines for inspection. |
If you spot a recurring failure mode that isn't in the catalog, leave a [NOTE] line at the end suggesting a new pattern entry. Don't invent classifications inline.
This phase only runs when all of the following are true:
dbt run completed with errors, Snowflake SQL compile error, or Snowflake referential issue.models/**/*.sql for dbt errors, or snowchange/sql/**/V*.sql for Snowchange errors).syntax error line 89 at position 23).If any condition is false, skip Phase 5 and go straight to Phase 6.
When the conditions are met:
Parse the file path and line number from the error block. Common shapes:
Database Error in model <name> (models/<rel-path>.sql) -> file = models/<rel-path>.sql, model name = <name>compiled code at target/run/dw_dbt/<rel-path> -> file = <rel-path> (the compiled-output path; the source is in models/)syntax error line <N> -> cited_line = <N> (1-indexed)Fetch the source file from BOTH develop and master branches of ImagineLearning/DataPlatformSnowflake via mcp__plugin_il_github__get_file_contents:
mcp__plugin_il_github__get_file_contents
owner: ImagineLearning
repo: DataPlatformSnowflake
path: dw_dbt/app/snowflake/<rel-path> (the compiled paths drop dw_dbt/app/snowflake/ — re-add it)
ref: refs/heads/develop
Repeat with ref: refs/heads/master. If a branch returns 404 (file doesn't exist there), record that — it's a meaningful signal.
Surface the cited line + 10 lines of context (5 before, 5 after, clamped to file bounds). For each branch, print:
----- develop @ <commit-sha-prefix> -----
<line N-5> ...
<line N-4> ...
... (lines N-3, N-2, N-1)
>> <line N> <the cited line, marked with `>>`>
<line N+1> ...
... (lines N+2..N+5)
Compare develop vs master:
Propose a fix based on the failure pattern and the snippet:
unexpected ';' -> suggest removing the stray semicolon at the cited position.does not exist or not authorized (Snowflake referential) -> the object name in the error message; suggest checking grants or env-aware sourcing.Compilation Error: 'env_var' returned 'None' -> a DBT_* var is unset.Mark the proposal as suggested — never as authoritative. The engineer reviews and applies.
Add a Section 1 sub-block to Phase 6 titled Source file analysis that contains the snippet(s) and the suggested fix. The Slack draft (Section 2) summarizes the file and line; the postmortem (Section 3) links to the GitHub URL of the cited line on both branches.
If the GitHub MCP is unavailable or the fetch fails, skip Phase 5 silently and continue. Note the skip as [NOTE] Could not fetch source file from GitHub: <reason> at the end of Phase 6's output.
Produce three sections in this order. Print all three in one pass — engineer copies what they need.
=========================================================================
Incident Summary
=========================================================================
When: <stop time UTC, "X minutes ago" if recent>
Where: <cluster> :: <task definition family>
DAG: <dag_name> (file: <dag_path>) [or "unknown" if not resolved]
Schedule: <schedule_interval> [if DAG resolved]
Failure: <pattern> (confidence: <high|medium|low>)
Stop code: <stop_code>
Stop reason: <stopped_reason>
Evidence (top 5 log lines that drove the classification):
<ts> <line>
<ts> <line>
...
Suggested fix:
1. <action 1>
2. <action 2>
3. <action 3>
If the DAG is resolved, also note: "Failure callback: <configured | NOT CONFIGURED — failures may not page anyone>" — this is the IL convention check.
If Phase 5 ran (source file fetched from develop + master), append a Source file analysis block right after the suggested fix:
Source file analysis:
File: <rel-path>
Line: <N>
Branches checked: develop, master
----- develop @ <sha> -----
<line N-5>
<line N-4>
...
>> <line N> <-- cited
<line N+1>
...
----- master @ <sha> -----
<line N-5>
...
>> <line N>
...
Diff (develop vs master, around the cited line):
<unified diff if branches differ; otherwise "Identical on both branches">
Suggested edit (engineer to verify and apply):
<one-line proposal, e.g. "Remove stray ';' on line 89">
A copy-pasteable message for #data-platform-incidents (or whichever channel the team uses). Match the IL DP team's typical incident format:
=========================================================================
Slack Draft (for #data-platform-incidents)
=========================================================================
:rotating_light: <DAG name or task family> failed at <time UTC>
*What:* <one-line summary of the failure pattern>
*Cluster:* <cluster>
*Task ARN:* <full ARN>
*Logs:* <log group> :: <log stream>
*Suspected cause:* <pattern + confidence>
*Next step:* <first item from suggested fix>
cc <inferred-from-DAG-owner-comment-or-CODEOWNERS, e.g. @data-platform-oncall>
If the DAG file's first comment or default_args includes an owner field, use that for the cc line. Otherwise mark cc <add-oncall-here> so the engineer fills it in.
If the failure pattern includes raw user IDs, emails, or payloads in the evidence lines, redact them in the Slack draft (the in-terminal Section 1 keeps them so the engineer can investigate). Use <redacted-pii> placeholders.
A markdown skeleton ready to paste into Confluence under the team's standard Tickets and Bugs > <TICKET> folder convention (the dp:publish-ticket-confluence skill handles publishing if the engineer fills it in).
=========================================================================
Postmortem Skeleton (markdown)
=========================================================================
# <DP-TICKET> - <DAG name> failure on <date>
## Impact
<engineer fills this in: who/what was affected, downstream tables that didn't refresh, etc.>
## Timeline (UTC)
- **<stop time>** Task `<task-id>` stopped with `<stop_code>` on cluster `<cluster>`.
- **<detection time — engineer fills>** Failure detected via <Slack alert | manual check | downstream consumer escalation>.
- **<mitigation time — engineer fills>** <first fix action taken>.
## Root Cause
<engineer fills — start from the failure pattern below as the hypothesis>
Failure pattern (auto-classified): **<pattern>**
Confidence: **<high|medium|low>**
Evidence:
\`\`\`
<top 5 log lines>
\`\`\`
## What went wrong
<engineer fills — was it deploy, data, infra, dependency, etc.>
## What went right
<engineer fills — alerting fired, oncall responded, mitigation was fast, etc.>
## Action Items
- [ ] <suggested fix #1 from above>
- [ ] <suggested fix #2 from above>
- [ ] Add a regression test / monitor for this pattern (engineer to specify)
## Links
- Failed task: <full ARN>
- Log group: <group> :: <stream>
- DAG source: https://github.com/ImagineLearning/DataPlatformAirflowDags/blob/main/<dag_path>
- ECS console: https://us-west-2.console.aws.amazon.com/ecs/v2/clusters/<cluster>/tasks/<task-id>
[if Phase 5 ran, also include:]
- Failing source on develop: https://github.com/ImagineLearning/DataPlatformSnowflake/blob/develop/<rel-path>#L<N>
- Failing source on master: https://github.com/ImagineLearning/DataPlatformSnowflake/blob/master/<rel-path>#L<N>
=========================================================================
exit 0, no errors), still print Section 1 with Failure: none — task succeeded so the engineer has confirmation. Skip Sections 2 and 3.Logs: not retrievable — <reason>. Still print Slack and postmortem drafts, marked with <add evidence here> placeholders.DAG: unknown — task family <family> matches multiple DAGs: [list] or DAG: unknown — no matching DAG found. Skack and postmortem drafts use the task identity instead.ecs_log_fetcher.py auto-runs aws sso login --profile <profile> (which pops a browser), waits for the engineer to approve, and retries the AWS calls once. If auto-login fails (no aws CLI on PATH, browser approval not given), the script prints the manual command and stops.models/** to find the failing model's upstream ref() and source() calls, and the downstream models that ref it. Out of scope for v1.| Need | Skill |
|---|---|
| AWS SSO profile setup for the AWS calls this skill makes | il:configure-aws-cli-multi-account |
| Search OpenSearch logs (non-ECS) | il:search-opensearch-logs |
| Publish the postmortem skeleton to Confluence once filled in | dp:publish-ticket-confluence |
npx claudepluginhub imaginelearning/dp-claude-plugin --plugin dpProvides UI/UX resources: 50+ styles, color palettes, font pairings, guidelines, charts for web/mobile across React, Next.js, Vue, Svelte, Tailwind, React Native, Flutter. Aids planning, building, reviewing interfaces.
Searches MemPalace before answering questions about past work, people, projects, or prior decisions. Returns verbatim stored content instead of guessing from model memory.