From skillry-data-ml-ai-engineering
Use when you need to review or design an ETL/ELT data pipeline for idempotency, incremental loads, orchestration (Airflow/Dagster/cron), backfill safety, partitioning, and failure recovery.
How this skill is triggered — by the user, by Claude, or both
Slash command
/skillry-data-ml-ai-engineering:311-data-pipeline-reviewThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Review or design a batch or streaming data pipeline so that it is idempotent, restartable, and correct under retries and backfills. Cover extraction, transformation, load semantics, orchestration (Airflow, Dagster, Prefect, or cron), partitioning, watermarking for incremental loads, and recovery from partial failure. The goal is a pipeline that produces the same output when re-run on the same i...
Review or design a batch or streaming data pipeline so that it is idempotent, restartable, and correct under retries and backfills. Cover extraction, transformation, load semantics, orchestration (Airflow, Dagster, Prefect, or cron), partitioning, watermarking for incremental loads, and recovery from partial failure. The goal is a pipeline that produces the same output when re-run on the same input, never double-writes, and can backfill a historical window without corrupting current data.
# Locate orchestration definitions
find . -path '*/dags/*.py' -o -name 'dagster*.py' -o -name '*_flow.py' 2>/dev/null | grep -v __pycache__ | head -30
ls dbt_project.yml profiles.yml 2>/dev/null
grep -rn "schedule_interval\|@daily\|cron\|ScheduleDefinition\|@schedule" . --include="*.py" | head -20
Identify each task, its upstream inputs, its output table or file, and the trigger cadence.
# Look for non-idempotent appends vs idempotent upsert/overwrite
grep -rn "INSERT INTO\|\.to_sql(\|if_exists=\|COPY INTO\|MERGE INTO\|ON CONFLICT" . --include="*.py" --include="*.sql" | head -30
# Truncate/overwrite patterns
grep -rn "TRUNCATE\|DELETE FROM\|overwrite\|replace" . --include="*.py" --include="*.sql" | head -20
Confirm each load is one of: full overwrite of a partition, MERGE/upsert on a key, or insert into a fresh partition that is atomically swapped. A bare INSERT/append with retries will duplicate rows.
# Watermark / high-water-mark tracking
grep -rn "watermark\|last_run\|updated_at\|incremental\|max(\|execution_date\|data_interval" . --include="*.py" --include="*.sql" | head -25
The cursor must use an event/business timestamp (not wall-clock), tolerate late data with a lookback window, and persist the watermark only after a successful load.
# Retries, idempotency keys, catchup behavior, dependencies
grep -rn "retries\|retry_delay\|catchup\|depends_on_past\|max_active_runs\|wait_for_downstream" . --include="*.py" | head -25
Confirm retries are bounded, catchup/backfill behavior is intentional, and concurrent runs are limited so two runs cannot write the same partition.
Confirm there is a parameterized way to reprocess a date range that targets only the affected partitions, runs against a staging location or with MERGE, and never deletes live data before the replacement is validated.
MERGE/upsert on a stable key, or atomic partition swap — not a bare append under retry.retries and retry_delay.max_active_runs / concurrency prevents two runs writing the same partition.catchup / depends_on_past is set deliberately, not left at an accidental default.MERGE, never destructive on live tables.Idempotent partitioned upsert (SQL):
-- Load a single day's partition idempotently using MERGE.
-- Safe to re-run: matched rows update, new rows insert, no duplicates.
MERGE INTO analytics.orders AS t
USING staging.orders_2026_06_01 AS s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET
status = s.status, amount = s.amount, updated_at = s.updated_at
WHEN NOT MATCHED THEN INSERT (order_id, status, amount, updated_at)
VALUES (s.order_id, s.status, s.amount, s.updated_at);
Idempotent incremental extract with watermark and lookback (Python):
from datetime import timedelta
def incremental_window(last_watermark, now, lookback=timedelta(hours=2)):
"""Return [start, end) for an incremental pull.
Re-pulls a lookback window so late-arriving rows are captured.
Caller must MERGE (not append) to stay idempotent.
"""
start = last_watermark - lookback
end = now
return start, end
def run(conn, store):
last = store.get_watermark("orders") # persisted business timestamp
start, end = incremental_window(last, now_utc())
rows = conn.fetch(
"SELECT * FROM source.orders "
"WHERE updated_at >= %s AND updated_at < %s",
(start, end),
)
upsert_orders(conn, rows) # MERGE, not INSERT
store.set_watermark("orders", end) # persist ONLY after success
Airflow task skeleton with bounded retries and no accidental catchup:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {"retries": 3, "retry_delay": timedelta(minutes=5)}
with DAG(
dag_id="orders_incremental",
schedule_interval="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False, # do not silently backfill on deploy
max_active_runs=1, # never two writers on one partition
default_args=default_args,
) as dag:
load = PythonOperator(task_id="load_orders", python_callable=run)
INSERT inside a task with retries — every retry duplicates rows.now() / wall-clock instead of the event timestamp, dropping late data.catchup=True left on by default, triggering an unintended multi-month backfill at deploy time.DELETEs live partitions first, leaving a gap if the reload fails.Produce a structured report with:
file:line | issue | risk | concrete fix.npx claudepluginhub fluxonlab/skillry --plugin skillry-data-ml-ai-engineeringSearches MemPalace before answering questions about past work, people, projects, or prior decisions. Returns verbatim stored content instead of guessing from model memory.
Guides Payload CMS config (payload.config.ts), collections, fields, hooks, access control, APIs. Debugs validation errors, security, relationships, queries, transactions, hook behavior.
Implements vector databases with Pinecone, Weaviate, Qdrant, Milvus, pgvector for semantic search, RAG, recommendations, and similarity systems. Optimizes embeddings, indexing, and hybrid search.