From opengov-garden
This skill provides expertise in managing Apache Airflow DAGs for ETL orchestration. It should be used when the user asks about "airflow dag", "create workflow", "dag management", "airflow orchestration", "schedule dags", or similar Airflow-related topics.
How this skill is triggered — by the user, by Claude, or both
Slash command
/opengov-garden:airflow-managementThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
This skill provides expertise in managing Apache Airflow DAGs for ETL orchestration. It should be used when the user asks about "airflow dag", "create workflow", "dag management", "airflow orchestration", "schedule dags", or similar Airflow-related topics.
This skill provides expertise in managing Apache Airflow DAGs for ETL orchestration. It should be used when the user asks about "airflow dag", "create workflow", "dag management", "airflow orchestration", "schedule dags", or similar Airflow-related topics.
Master the creation, management, and orchestration of Apache Airflow DAGs for ETL pipelines with proper error handling, wiring, and monitoring.
Every DAG should follow this standard structure:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Import project-specific utilities
from src.database.db_adapter import DBAdapter
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
# Default arguments applied to all tasks
default_args = {
'owner': 'opengov-harvester',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
# DAG definition
dag = DAG(
'opengov_example_workflow',
default_args=default_args,
description='Example workflow for OpenGov data extraction',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=days_ago(1),
catchup=False,
tags=['opengov', 'etl', 'extraction'],
max_active_runs=1, # Prevent concurrent runs
)
def task_with_error_handling(**context):
"""Example task with proper error handling."""
db = DBAdapter()
try:
# Task logic here
logger.info("Starting task execution")
# Do work
result = perform_extraction()
# Log success
db.log_success(
dag_id=context['dag'].dag_id,
task_id=context['task'].task_id,
run_id=context['run_id'],
result=result
)
return result
except Exception as e:
# Log error to database
db.log_error(
dag_id=context['dag'].dag_id,
task_id=context['task'].task_id,
run_id=context['run_id'],
error=str(e)
)
logger.error(f"Task failed: {e}", exc_info=True)
raise # Re-raise to mark task as failed
finally:
db.close()
# Define tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=task_with_error_handling,
dag=dag,
)
# Set task dependencies
extract_task
Extract → Transform → Load pattern:
from airflow.operators.python import PythonOperator
def extract(**context):
"""Extract data from source."""
logger.info("Extracting data...")
# Extraction logic
return extracted_data
def transform(**context):
"""Transform extracted data."""
ti = context['ti']
data = ti.xcom_pull(task_ids='extract')
logger.info(f"Transforming {len(data)} records...")
# Transformation logic
return transformed_data
def load(**context):
"""Load data to destination."""
ti = context['ti']
data = ti.xcom_pull(task_ids='transform')
logger.info(f"Loading {len(data)} records...")
# Load logic
extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)
extract_task >> transform_task >> load_task
Process multiple items in parallel:
from airflow.decorators import task
@task
def get_project_list():
"""Get list of projects to process."""
db = DBAdapter()
projects = db.get_pending_projects()
return [p['project_id'] for p in projects]
@task
def process_project(project_id):
"""Process a single project."""
logger.info(f"Processing project: {project_id}")
# Processing logic
return {"project_id": project_id, "status": "completed"}
@task
def aggregate_results(results):
"""Aggregate all project results."""
logger.info(f"Processed {len(results)} projects")
# Store aggregated results
with DAG('opengov_parallel_processing', ...) as dag:
projects = get_project_list()
results = process_project.expand(project_id=projects)
aggregate_results(results)
Execute different paths based on conditions:
from airflow.operators.python import BranchPythonOperator
def check_data_freshness(**context):
"""Decide whether to skip or proceed."""
db = DBAdapter()
last_update = db.get_last_update_time()
if (datetime.now() - last_update).hours < 6:
return 'skip_extraction' # Task ID to execute
else:
return 'run_extraction'
branch_task = BranchPythonOperator(
task_id='check_freshness',
python_callable=check_data_freshness,
dag=dag,
)
skip_task = PythonOperator(
task_id='skip_extraction',
python_callable=lambda: logger.info("Skipping - data is fresh"),
dag=dag,
)
run_task = PythonOperator(
task_id='run_extraction',
python_callable=extract_data,
dag=dag,
)
branch_task >> [skip_task, run_task]
Configure database connections correctly in DAGs:
import os
from src.database.db_adapter import DBAdapter
def get_database_config():
"""Get database configuration based on environment."""
storage_target = os.getenv('DB_STORAGE_TARGET', 'sqlite')
if storage_target == 'supabase':
return {
'type': 'postgres',
'host': os.getenv('SUPABASE_DB_HOST', 'localhost'),
'port': int(os.getenv('SUPABASE_DB_PORT', 54322)),
'database': os.getenv('SUPABASE_DB_NAME', 'postgres'),
'user': os.getenv('SUPABASE_DB_USER', 'postgres'),
'password': os.getenv('SUPABASE_DB_PASSWORD'),
}
else: # sqlite
return {
'type': 'sqlite',
'database': os.getenv('SQLITE_DB_PATH', 'data/db/opengov_state.db'),
}
def task_with_db(**context):
"""Task that uses database adapter."""
config = get_database_config()
db = DBAdapter(**config)
try:
# Use database
db.execute("SELECT * FROM projects")
finally:
db.close()
Validate required environment variables at DAG load time:
import os
REQUIRED_ENV_VARS = [
'DB_STORAGE_TARGET',
'SUPABASE_URL',
'SQLITE_DB_PATH',
]
def validate_environment():
"""Validate required environment variables are set."""
missing = []
for var in REQUIRED_ENV_VARS:
if not os.getenv(var):
missing.append(var)
if missing:
raise EnvironmentError(
f"Missing required environment variables: {', '.join(missing)}"
)
# Validate on DAG load
validate_environment()
Report all errors to database table:
def log_dag_error(context):
"""Callback function for task failure."""
db = DBAdapter()
error_record = {
'dag_id': context['dag'].dag_id,
'task_id': context['task_instance'].task_id,
'execution_date': context['execution_date'],
'error_message': str(context['exception']),
'traceback': context['task_instance'].log_url,
'run_id': context['run_id'],
}
db.insert('run_errors', error_record)
db.close()
# Add to default_args
default_args = {
...
'on_failure_callback': log_dag_error,
}
Log all significant events:
def logged_task(**context):
"""Task with comprehensive logging."""
logger.info(f"Task started: {context['task_instance'].task_id}")
logger.info(f"Run ID: {context['run_id']}")
logger.info(f"Execution date: {context['execution_date']}")
start_time = datetime.now()
try:
# Task work
result = do_work()
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Task completed in {duration}s")
return result
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(f"Task failed after {duration}s: {e}")
raise
Track custom metrics for monitoring:
from airflow.metrics import Stats
def task_with_metrics(**context):
"""Task that emits custom metrics."""
start_time = time.time()
try:
# Do work
records_processed = process_records()
# Emit metrics
Stats.gauge('opengov.records.processed', records_processed)
Stats.timing('opengov.task.duration', time.time() - start_time)
Stats.incr('opengov.task.success')
except Exception as e:
Stats.incr('opengov.task.failure')
raise
Create a DAG to monitor system health:
from airflow import DAG
from airflow.operators.python import PythonOperator
def check_database_health():
"""Check database connectivity and health."""
db = DBAdapter()
try:
db.execute("SELECT 1")
return "healthy"
except Exception as e:
raise Exception(f"Database unhealthy: {e}")
finally:
db.close()
def check_supabase_health():
"""Check Supabase API health."""
import requests
response = requests.get(f"{os.getenv('SUPABASE_URL')}/rest/v1/")
if response.status_code != 200:
raise Exception(f"Supabase unhealthy: {response.status_code}")
with DAG('system_health_check', schedule_interval='*/15 * * * *', ...) as dag:
db_check = PythonOperator(task_id='check_database', python_callable=check_database_health)
supabase_check = PythonOperator(task_id='check_supabase', python_callable=check_supabase_health)
[db_check, supabase_check]
Test DAG tasks independently:
# tests/test_dags.py
import pytest
from dags.opengov_extraction import extract_task, transform_task
def test_extract_task():
"""Test extract task logic."""
# Mock context
context = {
'dag': Mock(dag_id='test_dag'),
'task': Mock(task_id='test_task'),
'run_id': 'test_run',
}
# Execute task
result = extract_task.python_callable(**context)
# Assert results
assert result is not None
assert len(result) > 0
def test_dag_structure():
"""Test DAG structure and dependencies."""
from dags.opengov_extraction import dag
# Check tasks exist
assert 'extract' in dag.task_ids
assert 'transform' in dag.task_ids
assert 'load' in dag.task_ids
# Check dependencies
extract = dag.get_task('extract')
assert 'transform' in [t.task_id for t in extract.downstream_list]
DAGs must be importable without executing tasks:
# ❌ Bad - executes on import
db = DBAdapter() # Opens connection at import time
# ✅ Good - executes at runtime
def get_db():
return DBAdapter()
Detect and prevent circular dependencies:
# Test DAG file for errors
python dags/my_dag.py
# Check for circular dependencies
airflow dags test opengov_workflow 2024-01-01
Don't pass large datasets via XCom:
# ❌ Bad - passes large data via XCom
@task
def extract():
return large_dataset # Don't do this!
# ✅ Good - store in database/file
@task
def extract():
save_to_database(large_dataset)
return {"records": len(large_dataset), "path": "db://opengov.projects"}
Use Garden commands to manage DAGs:
# Create new DAG from template
garden dag new my_workflow --template=etl
# List all DAGs
garden dag list
# Monitor specific DAG
garden dag monitor opengov_extraction
npx claudepluginhub heaney-investments/opengov-garden --plugin opengov-gardenBuilds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.
Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use for data pipelines, workflow orchestration, and batch jobs.
Builds production-ready Apache Airflow DAGs with patterns for operators, sensors, testing, and deployment. For data pipelines, workflow orchestration, and batch jobs.