Aceteam Workflow Engine

A powerful, modular workflow orchestration system designed for composing complex computational tasks from smaller, configurable steps. This engine powers the workflow functionality in Aceteam.ai and is now available as an open-source package.
Overview
The Workflow Engine enables you to:
- Define workflows as directed acyclic graphs (DAGs)
- Chain node-based tasks with type-safe data passing
- Persist and retrieve node outputs using various storage backends
- Execute workflows programmatically or via API
Installation
pip install aceteam-workflow-engine
Claude Code skill (optional)
This repo also publishes a Claude Code plugin marketplace. If you use Claude Code, install the wengine skill so Claude can help you explore, compose, validate, and run workflows:
/plugin marketplace add aceteam-ai/workflow-engine
/plugin install wengine@aceteam-workflow-engine
Example
import asyncio
from workflow_engine import IntegerValue, Workflow
import workflow_engine.nodes
from workflow_engine.contexts import LocalContext
from workflow_engine.execution import TopologicalExecutionAlgorithm
context = LocalContext()
algorithm = TopologicalExecutionAlgorithm()
# Load and run a workflow
with open("examples/addition.json") as f:
workflow = Workflow.model_validate_json(f.read())
result = asyncio.run(algorithm.execute(
context=context,
workflow=workflow,
input={"c": IntegerValue(-256)},
)) # {'sum': 1811}
Check the examples directory for more sample workflows in JSON form:
Key Features
Core Functionality
- Graph-Based Execution: Workflows are executed as DAGs with automatic dependency resolution
- Type-Safe Data Flow: Data passing between nodes is validated using MIME types
- Flexible Storage: Supports multiple storage backends (Supabase, Local, In-Memory)
- Error Handling: Robust error propagation and logging system
- Versioning: Built-in support for workflow versioning
Workflow Structure
Workflows are defined as JSON with:
- input_node: Defines the workflow's input schema using
InputNode
- inner_nodes: The processing nodes that perform computations
- output_node: Defines the workflow's output schema using
OutputNode
- edges: Connect node outputs to inputs (including to/from input/output nodes)
Node Types
- InputNode: Special node defining workflow input fields and their types
- OutputNode: Special node defining workflow output fields and their types
- Processing Nodes: Execute computational tasks with configurable parameters
Storage Backends
- Supabase: Primary storage backend for production use
- Local: File-system based storage for development
- In-Memory: Lightweight storage for testing
Value Type Casting
The workflow engine supports automatic type casting between Value types. The graph below shows all available casting paths:

Value types serialize to JSON Schema for validation and type resolution. See Values for how the schema system works and limitations around deeply nested generics.
Architecture
src/workflow_engine/
├── contexts/ # Storage backend implementations
│ ├── in_memory.py # In-memory storage
│ └── local.py # Local file system storage
├── core/ # Core workflow components
│ ├── context.py # Execution context
│ ├── data.py # Data handling
│ ├── edge.py # Edge definitions
│ ├── execution.py # Execution logic
│ ├── file.py # File handling
│ ├── node.py # Node base classes
│ └── workflow.py # Workflow definitions
├── execution/ # Execution strategies
│ └── topological.py # DAG-based execution
├── nodes/ # Node implementations
│ ├── arithmetic.py # Math operations
│ ├── constant.py # Constant values
│ ├── json.py # JSON operations
│ └── text.py # Text operations
└── utils/ # Helper utilities
Development
Setup
# Using uv (recommended)
uv sync
uv run pre-commit install
Testing
uv run pytest # Runs both unit and integration tests
Documentation