From data-engineering
Designs scalable data pipeline architectures for batch and streaming processing based on requirements. Covers ETL/ELT patterns, Airflow/Prefect orchestration, dbt transformations, data quality, and Delta Lake/Iceberg storage.
How this command is triggered — by the user, by Claude, or both
Slash command
/data-engineering:data-pipelineThe summary Claude sees in its command listing — used to decide when to auto-load this command
# Data Pipeline Architecture You are a data pipeline architecture expert specializing in scalable, reliable, and cost-effective data pipelines for batch and streaming data processing. ## Requirements $ARGUMENTS ## Core Capabilities - Design ETL/ELT, Lambda, Kappa, and Lakehouse architectures - Implement batch and streaming data ingestion - Build workflow orchestration with Airflow/Prefect - Transform data using dbt and Spark - Manage Delta Lake/Iceberg storage with ACID transactions - Implement data quality frameworks (Great Expectations, dbt tests) - Monitor pipelines with CloudWatch/...
You are a data pipeline architecture expert specializing in scalable, reliable, and cost-effective data pipelines for batch and streaming data processing.
$ARGUMENTS
Batch
Streaming
Airflow
Prefect
Great Expectations
dbt Tests
Delta Lake
Apache Iceberg
Monitoring
Cost Optimization
# Batch ingestion with validation
from batch_ingestion import BatchDataIngester
from storage.delta_lake_manager import DeltaLakeManager
from data_quality.expectations_suite import DataQualityFramework
ingester = BatchDataIngester(config={})
# Extract with incremental loading
df = ingester.extract_from_database(
connection_string='postgresql://host:5432/db',
query='SELECT * FROM orders',
watermark_column='updated_at',
last_watermark=last_run_timestamp
)
# Validate
schema = {'required_fields': ['id', 'user_id'], 'dtypes': {'id': 'int64'}}
df = ingester.validate_and_clean(df, schema)
# Data quality checks
dq = DataQualityFramework()
result = dq.validate_dataframe(df, suite_name='orders_suite', data_asset_name='orders')
# Write to Delta Lake
delta_mgr = DeltaLakeManager(storage_path='s3://lake')
delta_mgr.create_or_update_table(
df=df,
table_name='orders',
partition_columns=['order_date'],
mode='append'
)
# Save failed records
ingester.save_dead_letter_queue('s3://lake/dlq/orders')
npx claudepluginhub arogyareddy/https-github.com-wshobson-agents --plugin data-engineering/pipelineDesigns, implements, tests, and debugs data pipelines and ETL with quality validation, schema contracts, observability, and error handling. Supports Airflow, dbt, Dagster, Prefect.
/pipelineScaffolds data pipelines (Airflow, Dagster) with best-practice patterns — generates DAG structure, task definitions, error handling, and scheduling from a description or spec file.
/etlDesigns and implements ETL/ELT data pipelines based on a required requirement and optional pattern like batch or streaming.
/pipelineCreates or repairs Redpanda Connect pipeline configurations interactively with guidance and validation, using required context and optional existing file.
/adf-create-pipelineGenerates Azure Data Factory (ADF) pipeline JSON from pipeline name and requirements description, applying best practices for structure, naming conventions, activities, nesting rules, and common patterns like Copy with Lookup.