From python-dev-workflow
This skill should be used when the user asks to "configure Celery", "set up Celery with Redis", "write Celery tasks", "add background tasks with Celery", "celery beat schedule", "celery task retry", "celery performance tuning", "celery TDD", "test Celery tasks", "celery worker config", "celery monitoring", "celery security", or when code imports celery or the project uses a celeryconfig/celery.py module.
How this skill is triggered — by the user, by Claude, or both
Slash command
/python-dev-workflow:celery-expertopusThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Guidance for production Celery with Redis as broker/result backend, covering task design, workflows, Beat scheduling, performance, monitoring, and security.
Guidance for production Celery with Redis as broker/result backend, covering task design, workflows, Beat scheduling, performance, monitoring, and security.
Core Principles:
Pydantic models in tasks: Do not manually serialize Pydantic models (.dict(), .model_dump(), **kwargs). Use celery-pydantic so tasks can accept models directly.
Activate when any of the following appear:
from celery import Celery or @app.task / @shared_task in codeceleryconfig.py, celery.py, or beat_schedule in project filesFollow this 4-step cycle for every task:
task_always_eager=True for unit testsbind=True, proper error handling@pytest.fixture
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True,
'task_eager_propagates': True,
}
class TestProcessOrder:
def test_process_order_success(self, celery_app, celery_worker):
from myapp.tasks import process_order
result = process_order.delay(order_id=123)
assert result.get(timeout=10) == {'order_id': 123, 'status': 'success'}
def test_process_order_idempotent(self, celery_app, celery_worker):
from myapp.tasks import process_order
result1 = process_order.delay(order_id=123).get(timeout=10)
result2 = process_order.delay(order_id=123).get(timeout=10)
assert result1['status'] in ['success', 'already_processed']
assert result2['status'] in ['success', 'already_processed']
See examples/conftest_celery.py for complete pytest fixture setup.
Every task should follow these rules:
bind=True — access self.request.id, self.retry(), and task metadatasoft_time_limit and time_limitacks_late=True for critical tasks — acknowledge after completion, not before@app.task(
bind=True,
max_retries=3,
acks_late=True,
reject_on_worker_lost=True,
time_limit=300,
soft_time_limit=240,
rate_limit='100/m',
)
def process_order(self, order_id: int):
...
from celery import Celery
app = Celery('myapp')
app.conf.update(
broker_url='redis://localhost:6379/0',
broker_connection_retry_on_startup=True,
broker_pool_limit=10,
result_backend='redis://localhost:6379/1',
result_expires=3600,
task_serializer='json',
result_serializer='json',
accept_content=['json'],
task_acks_late=True,
task_reject_on_worker_lost=True,
task_time_limit=300,
task_soft_time_limit=240,
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
)
See references/production-config.md for complete production setup with TLS, beat, and deployment.
| Pattern | Use When | Config |
|---|---|---|
| Manual retry | Custom backoff logic | self.retry(exc=exc, countdown=2 ** self.request.retries) |
| Auto-retry | Standard HTTP/network errors | autoretry_for=(RequestException,), retry_backoff=True |
| Auto-retry + jitter | High-concurrency retries | Add retry_jitter=True, retry_backoff_max=600 |
| Reject (no retry) | Permanent failures, poison messages | raise Reject(reason, requeue=False) |
from celery import chain, group, chord
# CHAIN: Sequential (A -> B -> C)
workflow = chain(fetch_data.s(url), process_item.s(), send_notification.s())
# GROUP: Parallel execution
job = group(fetch_data.s(url) for url in urls)
# CHORD: Map-Reduce (parallel + callback)
workflow = chord(group(process_item.s(item) for item in items))(aggregate_results.s())
Before writing code:
Before deploying:
acks_late=True on critical tasksNEVER: use pickle serialization, run without time limits, store large data in results, create non-idempotent tasks, run without broker auth, expose Flower without auth, manually serialize Pydantic models (use celery-pydantic)
ALWAYS: use JSON serialization, set time limits (soft + hard), make tasks idempotent, use acks_late=True for critical tasks, set result expiration, implement retry with backoff, monitor with Flower/Prometheus, log with correlation IDs
references/performance-patterns.md — Task chunking, prefetch tuning, result backend optimization, connection pooling, queue routingreferences/production-config.md — Complete Redis production config, beat scheduling, worker deployment, health checksreferences/security-standards.md — JSON serializer enforcement, Redis TLS/auth, input validation, Flower securityreferences/common-mistakes.md — Anti-pattern table with fixes: pickle, idempotency, time limits, large results, pydantic serializationWorking code examples in examples/:
examples/celery_app.py — App factory with Redis broker and recommended defaultsexamples/tasks.py — Task definitions: retry, chunking, idempotency patternsexamples/celery_beat_schedule.py — Crontab and interval periodic task configexamples/conftest_celery.py — Pytest fixtures for eager and integration testingnpx claudepluginhub alex-kopylov/zweihander --plugin python-dev-workflowProvides UI/UX resources: 50+ styles, color palettes, font pairings, guidelines, charts for web/mobile across React, Next.js, Vue, Svelte, Tailwind, React Native, Flutter. Aids planning, building, reviewing interfaces.
Fetches up-to-date documentation from Context7 for libraries and frameworks like React, Next.js, Prisma. Use for setup questions, API references, and code examples.