Stats
Actions
Tags
From example-skills
Builds data ingestion pipelines for batch and streaming data from files, APIs, and databases. Covers extraction strategies, format normalization, deduplication, validation, and staging patterns.
How this skill is triggered — by the user, by Claude, or both
Slash command
/example-skills:data-ingestion-pipelineThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Extract, validate, and load data from diverse sources into target systems.
Extract, validate, and load data from diverse sources into target systems.
Sources → Extract → Validate → Transform → Stage → Load → Verify
│ │ │ │ │ │ │
│ │ │ │ │ │ └─ Row counts match
│ │ │ │ │ └─ Write to target
│ │ │ │ └─ Staging table/file
│ │ │ └─ Normalize, enrich, deduplicate
│ │ └─ Schema validation, business rules
│ └─ Pull from source
└─ APIs, files, databases, streams
from pathlib import Path
import json
import csv
import yaml
class FileExtractor:
PARSERS = {
".json": lambda p: json.loads(p.read_text()),
".yaml": lambda p: yaml.safe_load(p.read_text()),
".yml": lambda p: yaml.safe_load(p.read_text()),
".csv": lambda p: list(csv.DictReader(p.open())),
}
def extract(self, path: Path) -> list[dict]:
parser = self.PARSERS.get(path.suffix)
if not parser:
raise ValueError(f"Unsupported format: {path.suffix}")
data = parser(path)
return data if isinstance(data, list) else [data]
import httpx
async def extract_paginated(base_url: str, params: dict = {}) -> list[dict]:
all_records = []
page = 1
async with httpx.AsyncClient() as client:
while True:
response = await client.get(base_url, params={**params, "page": page, "per_page": 100})
response.raise_for_status()
data = response.json()
records = data.get("items", data.get("results", data))
if not records:
break
all_records.extend(records)
page += 1
return all_records
import asyncpg
async def extract_from_db(dsn: str, query: str, batch_size: int = 1000):
conn = await asyncpg.connect(dsn)
try:
async for batch in conn.cursor(query, prefetch=batch_size):
yield dict(batch)
finally:
await conn.close()
from dataclasses import dataclass
@dataclass
class ValidationResult:
valid: list[dict]
invalid: list[tuple[dict, str]] # (record, error_message)
def validate_records(records: list[dict], schema: dict) -> ValidationResult:
result = ValidationResult(valid=[], invalid=[])
required_fields = schema.get("required", [])
for record in records:
errors = []
for field in required_fields:
if field not in record or record[field] is None:
errors.append(f"Missing required field: {field}")
for field, rules in schema.get("fields", {}).items():
if field in record and record[field] is not None:
value = record[field]
if "type" in rules and not isinstance(value, rules["type"]):
errors.append(f"{field}: expected {rules['type'].__name__}")
if "max_length" in rules and len(str(value)) > rules["max_length"]:
errors.append(f"{field}: exceeds max length {rules['max_length']}")
if errors:
result.invalid.append((record, "; ".join(errors)))
else:
result.valid.append(record)
return result
def apply_business_rules(records: list[dict]) -> ValidationResult:
result = ValidationResult(valid=[], invalid=[])
for record in records:
errors = []
# Example: organ must be valid
if record.get("organ") not in {"I", "II", "III", "IV", "V", "VI", "VII", "META"}:
errors.append(f"Invalid organ: {record.get('organ')}")
# Example: status must follow promotion state machine
valid_statuses = {"LOCAL", "CANDIDATE", "PUBLIC_PROCESS", "GRADUATED", "ARCHIVED"}
if record.get("status") not in valid_statuses:
errors.append(f"Invalid status: {record.get('status')}")
if errors:
result.invalid.append((record, "; ".join(errors)))
else:
result.valid.append(record)
return result
def deduplicate(records: list[dict], key_fields: list[str]) -> list[dict]:
seen = set()
unique = []
for record in records:
key = tuple(record.get(f) for f in key_fields)
if key not in seen:
seen.add(key)
unique.append(record)
return unique
from enum import Enum
class MergeStrategy(str, Enum):
KEEP_FIRST = "keep_first"
KEEP_LATEST = "keep_latest"
MERGE_FIELDS = "merge_fields"
def merge_duplicates(records: list[dict], key_fields: list[str], strategy: MergeStrategy) -> list[dict]:
groups: dict[tuple, list[dict]] = {}
for record in records:
key = tuple(record.get(f) for f in key_fields)
groups.setdefault(key, []).append(record)
merged = []
for key, group in groups.items():
if strategy == MergeStrategy.KEEP_FIRST:
merged.append(group[0])
elif strategy == MergeStrategy.KEEP_LATEST:
merged.append(group[-1])
elif strategy == MergeStrategy.MERGE_FIELDS:
result = {}
for record in group:
for k, v in record.items():
if v is not None:
result[k] = v
merged.append(result)
return merged
from pathlib import Path
from datetime import datetime
class StagingArea:
def __init__(self, base_dir: str):
self.base = Path(base_dir)
def stage(self, batch_id: str, records: list[dict]) -> Path:
stage_dir = self.base / batch_id
stage_dir.mkdir(parents=True, exist_ok=True)
data_path = stage_dir / "data.json"
meta_path = stage_dir / "metadata.json"
data_path.write_text(json.dumps(records, indent=2, default=str))
meta_path.write_text(json.dumps({
"batch_id": batch_id,
"record_count": len(records),
"staged_at": datetime.now().isoformat(),
"status": "staged",
}))
return stage_dir
def promote(self, batch_id: str) -> list[dict]:
stage_dir = self.base / batch_id
data = json.loads((stage_dir / "data.json").read_text())
meta = json.loads((stage_dir / "metadata.json").read_text())
meta["status"] = "promoted"
meta["promoted_at"] = datetime.now().isoformat()
(stage_dir / "metadata.json").write_text(json.dumps(meta, indent=2))
return data
class IngestionPipeline:
def __init__(self, extractor, validator, transformer, loader):
self.extractor = extractor
self.validator = validator
self.transformer = transformer
self.loader = loader
async def run(self, source: str) -> dict:
# Extract
raw = await self.extractor.extract(source)
# Validate
validation = self.validator.validate(raw)
if validation.invalid:
log.warning("validation_failures", count=len(validation.invalid))
# Transform
transformed = self.transformer.transform(validation.valid)
# Deduplicate
unique = deduplicate(transformed, key_fields=["id"])
# Load
loaded = await self.loader.load(unique)
return {
"extracted": len(raw),
"valid": len(validation.valid),
"invalid": len(validation.invalid),
"loaded": loaded,
}
npx claudepluginhub a-organvm/a-i--skills --plugin document-skillsProvides a checklist for code reviews covering functionality, security, performance, maintainability, tests, and quality. Use for pull requests, audits, team standards, and developer training.