From python
Builds and manages Temporal workflow orchestration in Python: worker setup, activity definitions, error handling with retries, signals, and queries.
How this skill is triggered — by the user, by Claude, or both
Slash command
/python:python-temporalThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Temporal SDK patterns for building durable, distributed workflows in Python.
Temporal SDK patterns for building durable, distributed workflows in Python.
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
)
await worker.run()
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, name: str) -> str:
"""Workflow run method"""
# Execute activity
result = await workflow.execute_activity(
my_activity,
name,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Hello {result}"
from temporalio import activity
@activity.defn
async def my_activity(name: str) -> str:
"""Activity - can fail and retry"""
# Do work (database, API, etc.)
return name.upper()
from temporalio.client import Client
async def start_workflow():
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
MyWorkflow.run,
"World",
id="my-workflow-id",
task_queue="my-task-queue",
)
result = await handle.result()
print(result) # "Hello WORLD"
from temporalio.exceptions import ActivityError
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
try:
result = await workflow.execute_activity(
risky_activity,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
except ActivityError as e:
# Handle failure after retries exhausted
return "Failed"
return result
@workflow.defn
class OrderWorkflow:
def __init__(self):
self.status = "pending"
@workflow.run
async def run(self, order_id: str) -> str:
await workflow.wait_condition(lambda: self.status == "approved")
return "Order processed"
@workflow.signal
def approve(self):
"""Signal to approve order"""
self.status = "approved"
@workflow.query
def get_status(self) -> str:
"""Query current status"""
return self.status
See references/ for testing patterns and common workflow patterns.
npx claudepluginhub martinffx/atelier --plugin codeImplements durable workflows, saga patterns, and distributed transactions using the Temporal Python SDK. Covers async/await execution models, testing strategies, and production deployment.
<!-- AUTO-GENERATED by export-plugins.py — DO NOT EDIT -->
Design durable workflows with Temporal for distributed systems. Covers workflow vs activity separation, saga patterns, state management, and determinism constraints.