From kitaru
Guide for writing Kitaru durable workflows and operational control paths. Use when creating or refactoring Kitaru flows, checkpoints, waits, logging, artifacts, tracked LLM calls, replay/resume/retry flows, KitaruClient usage, CLI commands, MCP operations, deployments, secrets, or adapter integrations for PydanticAI, OpenAI Agents, LangGraph, Claude Agent SDK, and Gemini Interactions. Triggers on mentions of kitaru, @flow, @checkpoint, kitaru.wait, kitaru.log, kitaru.save, kitaru.load, KitaruClient, replay, resume, retry, `kitaru executions ...`, MCP tools, `KitaruAgent`, `KitaruRunner`, `KitaruGraphRunner`, `KitaruClaudeRunner`, `KitaruGeminiInteractionsRunner`, `GeminiInteractionRequest`, `wait_for_input`, `wait_for_approval`, `wait_for_interrupt`, `requires_action`, Antigravity, or migration from deprecated `wrap(...)`.
How this skill is triggered — by the user, by Claude, or both
Slash command
/kitaru:kitaru-authoringThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Use this guide when writing or refactoring Kitaru workflows and when choosing
Use this guide when writing or refactoring Kitaru workflows and when choosing which Kitaru surface to use for running, observing, replaying, controlling, deploying, or inspecting durable artifacts and external state references for those workflows.
Before building: If the workflow shape is still fuzzy, suggest the
kitaru-scopingskill first. It helps the user decide whether Kitaru is a fit, where checkpoints and waits belong, which values should become explicit artifacts, and which replay anchors should be stable before code gets written.
Think of a Kitaru flow like a long trip with named save points and labeled boxes for durable outputs.
@flow is the durable outer boundary.@checkpoint is a replay boundary inside that flow.wait() pauses at the flow level and resumes later with input..run(...), not by calling the decorated function
directly.from kitaru import checkpoint, flow, wait
@checkpoint
def draft(topic: str) -> str:
return f"Draft for {topic}"
@flow
def review_flow(topic: str) -> str:
text = draft(topic)
approved = wait(name="approve_draft", question="Approve draft?", schema=bool)
if not approved:
return "Rejected"
return text
handle = review_flow.run("Durable agents")
print(handle.exec_id)
A FlowHandle is the object you use after submission:
handle.exec_id -> execution IDhandle.status -> current execution statushandle.wait() -> block until terminal state and return the resulthandle.get() -> fetch final result (or raise on failure)Enforce these rules when writing or reviewing Kitaru code:
wait() inside a checkpoint.save() and load() require checkpoint scope.log() works in flow scope and checkpoint scope, but it attaches metadata to
different targets depending on where it runs..submit(), .map(), and .product() are for work launched from inside a
running flow.llm() is valid only inside a @flow; outside a checkpoint it gets a
synthetic llm_call checkpoint automatically.@flowUse @flow for the durable orchestration boundary.
stack, image, cache, retries.run(...) — pass stack="..." to target a remote stack.replay(exec_id, from_=..., overrides=..., **flow_inputs)current_execution_id() inside a running flow/checkpoint when code needs
to record or pass along the active execution ID. It returns None outside a
Kitaru execution.@checkpointUse @checkpoint for meaningful replayable units of work.
retries, type, cache.submit(...).map(...).product(...)wait(...)wait(*, schema=bool, name=None, question=None, timeout=None, metadata=None)
pauses the flow until input arrives.
name valueslog(...)log(**kwargs) records structured metadata.
save(...) / load(...)Use explicit artifacts when a checkpoint should publish named outputs for later inspection or reuse.
save(name, value, *, type="output", tags: list[str] | None = None) requires
checkpoint scopeload(exec_id, name) requires checkpoint scope and an execution UUID string;
it can retrieve both explicit save(...) artifacts and implicit checkpoint
outputs by checkpoint/output nameprompt, response, context, input,
output, blobKitaru's current shipped public surface does not include a native key-value state API. If a workflow needs information to survive across executions, choose one of these explicit patterns instead:
save(...) inside checkpoints and inspect
them later through KitaruClient.artifacts or MCP artifact tools.llm(...) aliases, not for
arbitrary workflow state.Do not invent SDK helpers, CLI commands, or MCP tools for native durable key-value state. When replay correctness matters, make the critical value an explicit checkpoint output or saved artifact so the exact value from the source execution remains inspectable.
llm(...)llm(prompt, *, model=None, system=None, temperature=None, max_tokens=None, name=None) -> str
@flowmodel names an aliasllm() currently auto-resolves alias-linked secrets; other primitives do
not have this behaviorllm_call checkpoint so the call is still tracked and replayableReplay is one shared concept exposed through several surfaces.
flow.replay(exec_id, from_=..., overrides=..., **flow_inputs)KitaruClient().executions.replay(exec_id, from_=..., overrides=..., **flow_inputs)kitaru executions replay <exec_id> --from <selector> [--override checkpoint.<name>=<value>]kitaru_executions_replayfrom_ targets a checkpoint selector — a checkpoint name, invocation ID, or
call ID. Wait selectors are not valid replay anchors.
Override keys must use the checkpoint.<selector> namespace:
checkpoint.<name> — replace the cached output of that checkpointwait.* overrides are not supported; if the replayed execution reaches a
wait, resolve it via client.executions.input(...) or
kitaru executions inputDo not invent alternate replay APIs or made-up override keys.
When a flow hits wait(), the execution pauses. The resolution flow is:
client.executions.input(exec_id, wait=..., value=...), CLI kitaru executions input, or MCP
kitaru_executions_inputclient.executions.abort_wait(exec_id, wait=...)client.executions.resume(exec_id) or
kitaru executions resume as a manual fallbackinput resolves the wait; resume is a separate operation for paused
executions that didn't auto-continue.
Use the surface that matches the job instead of assuming everything is available in every interface.
wait, log, save, load, and llmconfigure(...), connect(server_url, ...), list_stacks(),
current_stack(), use_stack(), create_stack(...) (local stacks only),
delete_stack(...)create_secret(...), delete_secret(...), and get_secret(...) for
Kitaru-native secret writes/readscurrent_execution_id() inside active runs when code needs the execution
ID for downstream referencesflow.run(...), flow.replay(...)The client is for managing existing executions and for artifact inspection, not for launching new executions.
executions.get / list / latest / logs / pending_waits / input / abort_wait / retry / resume / replay / cancelartifacts.list / getlogin, logout, status, info (--all, --file, JSON/YAML export)clean project / global / all for safe local-state reset (--dry-run first)analytics opt-in / opt-out / statusauth token for a short-lived bearer token from the active connectionlog-store set / show / resetstack list / current / show / use / create / delete
stack create supports local, kubernetes, vertex, sagemaker,
azureml (remote stack creation is CLI/MCP only, not available in the
Python SDK create_stack())--extra for component overrides, --async for async
provisioningmodel register / listsecrets set / show / list / deletebuild, deploy, invoke, flow deployments list/show/delete/logs/curl, and flow tag / flow untagexecutions get / list / logs / input / replay / retry / resume / cancel--page / --size pagination where documented; --limit
is a first-page shortcut for compatible lists--output json / -o json emits
{command, item} for single-item commands, {command, items, count} for
lists, and JSONL event objects for executions logs --follow --output jsonkitaru_executions_list, kitaru_executions_get, kitaru_executions_latestget_execution_logskitaru_executions_run (target format: <module_or_file>:<flow_name>)kitaru_executions_input, kitaru_executions_retry,
kitaru_executions_replay, kitaru_executions_cancelkitaru_deployments_deploy, kitaru_deployments_invoke,
kitaru_deployments_list, kitaru_deployments_get,
kitaru_deployments_delete, kitaru_deployments_tag,
kitaru_deployments_untagkitaru_artifacts_list, kitaru_artifacts_getkitaru_secrets_create (metadata-only secret creation; no MCP delete tool)kitaru_start_local_server, kitaru_stop_local_server, kitaru_status,
kitaru_stacks_listmanage_stack (create/delete; supports local, kubernetes, vertex,
sagemaker, azureml, plus extra and async_mode)| Capability | SDK | KitaruClient | CLI | MCP |
|---|---|---|---|---|
| Launch new execution | Yes (flow object / Python entrypoint) | No | No top-level run command | Yes (kitaru_executions_run) |
| Inspect execution | Limited (FlowHandle) | Yes | Yes | Yes |
| Resolve wait input | No | Yes | Yes | Yes |
| Abort wait | No | Yes (abort_wait) | No | No |
| Resume paused execution | No | Yes | Yes | No |
| Replay execution | Yes (flow object) | Yes | Yes | Yes |
| Browse artifacts | No | Yes | No | Yes |
| List pending waits | No | Yes (pending_waits) | No | No |
| Create local stack | Yes | No | Yes | Yes |
| Create remote stack | No | No | Yes | Yes |
| Switch active stack | Yes | No | Yes | No |
| Deploy flow version | No (use CLI/server APIs) | Limited deployment namespace | Yes | Yes |
| Invoke deployment | No (use deployment endpoint/client) | Yes | Yes | Yes |
| Create secret | Yes | No | Yes | Yes (metadata only) |
| Delete secret | Yes | No | Yes | No |
| Print auth token / curl command | No | No | Yes | No |
| Clean/reset local state | No | No | Yes | No |
Use Kitaru configuration helpers instead of inventing custom runtime wiring.
configure(...) sets local execution defaultsconnect(server_url, ...) connects to a server via URL (Python SDK surface)kitaru login connects to a server URL or a managed workspace by name/ID
(CLI surface — broader than connect())list_stacks(), current_stack(), use_stack() and kitaru stack ... help
choose the active execution stackcreate_stack(...) in the SDK creates local stacks only; use CLI
(kitaru stack create) or MCP (manage_stack) for remote stacks
(kubernetes, vertex, sagemaker, azureml)model register / list manage local model aliases used by llm(...); alias
registries are transported into submitted/replayed runs via
KITARU_MODEL_REGISTRYsecrets set / show / list / delete manage secret values used by aliasescreate_secret(...) / delete_secret(...) are the Python SDK write helpers;
kitaru_secrets_create is the MCP metadata-only create pathkitaru auth token prints a short-lived bearer token for raw HTTP callskitaru flow deployments curl FLOW generates a copy-pasteable curl command
that starts a deployment execution without inlining real token valuesUse adapters when the agent framework already owns an inner runtime. Kitaru then needs a clear seam where it can put durable checkpoints without pretending to control side effects it cannot see.
KitaruAgentPublic surface to reach for in new code:
KitaruAgent(agent, *, name=None, capture=CapturePolicy(...), granular_checkpoints=True, ...)CapturePolicy(tool_capture="full" | "metadata" | None, tool_capture_overrides={...})wait_for_input(...) and hitl_tool(...) for human input from tool contextKitaruToolset, KitaruFunctionToolset, KitaruMCPServer,
kitaruify_toolset(...), and kitaruify_mcp_server(...) for lower-level
durable tool surfaceswrap(...) is still exported only as a deprecated compatibility shim. Do not
show it as the normal path for new code.
Key implementation rules:
granular_checkpoints=False switches to one turn checkpoint per agent run.@checkpoint, KitaruAgent runs as a passthrough so the
explicit checkpoint is the replay boundary.wait_for_input(...) is a wrapper around kitaru.wait(...); it still has to
create the wait at flow scope. In granular mode, opt regular waiting tools out
with tool_checkpoint_config_by_name={"tool_name": False} or use
@hitl_tool for pure wait tools."full", "metadata", or None.run_stream() and iter() return context managers and need explicit
checkpointing; streamed turns can fall back from granular to turn behavior.Safe default pattern for explicit flows:
import kitaru
from pydantic_ai import Agent
from kitaru.adapters.pydantic_ai import CapturePolicy, KitaruAgent
agent = Agent("openai:gpt-4o", name="researcher")
durable_agent = KitaruAgent(
agent,
capture=CapturePolicy(tool_capture="full"),
)
@kitaru.checkpoint
def run_agent(prompt: str) -> str:
return durable_agent.run_sync(prompt).output
@kitaru.flow
def my_flow(topic: str) -> str:
return run_agent(f"Research {topic}")
KitaruRunnerUse KitaruRunner for OpenAI Agents SDK agents.
checkpoint_strategy="runner_call" places one checkpoint around the outer
runner call. Prefer it when the flow needs a clean .wait() return value.checkpoint_strategy="calls" is the default granular mode: supported
model/tool calls become separate checkpoints. This is useful for fine replay,
but it can create multiple terminal checkpoints, so flow.run(...).wait() may
raise the ambiguous-result error. Inspect artifacts/UI/client output instead,
or choose runner_call.OpenAIRunRequest.start(...) and OpenAIRunRequest.resume(...) carry start
and resume state.wait_for_approval(...) bridges an interrupted OpenAI run into a normal
flow-scope Kitaru wait and returns a resume request.OpenAICapturePolicy controls saved input/output/run-state/interruption/usage
details. Use tool checkpoint overrides for side-effectful tools.calls mode must run at flow scope, not inside another checkpoint.KitaruGraphRunnerUse KitaruGraphRunner for LangGraph graphs and LangChain/Deep Agents objects
that behave like LangGraph runnables.
checkpoint_strategy="graph_call" is the default coarse boundary: one Kitaru
checkpoint per outer invoke(...) / ainvoke(...) call.checkpoint_strategy="calls" creates true sync model/tool checkpoints only
when KitaruLangGraphMiddleware wraps the LangChain handler call. Callbacks
and event streams are trace-only; they are not replay boundaries.async_checkpoint_policy is not a
hidden switch for true async checkpoints.InMemorySaver, treat it as a local LangGraph checkpointer, not durable
Kitaru state.wait_for_interrupt(...) bridges LangGraph interrupts to flow-scope
kitaru.wait(...) and returns a resume request.LangGraphCapturePolicy defaults to metadata-first summaries; saving full
state values can persist prompts, tool outputs, or customer data.KitaruClaudeRunnerUse KitaruClaudeRunner when one Claude SDK invocation should be durable.
checkpoint_strategy="invocation" is the only supported strategy and is the
default. "calls", "runner_call", "model_call", and "tool_call" are
rejected because the adapter does not provide granular Claude-internal replay.runner.run(...) / runner.run_sync(...) directly in the flow body so the
adapter can create its invocation checkpoint. Calling from inside an existing
checkpoint is rejected unless you explicitly opt into direct execution and
accept replay risk.ClaudeRunRequest carries prompt/options such as cwd, session resume ID, and
max turns. ClaudeCapturePolicy controls saved messages/transcripts/usage and
manifest details.KitaruGeminiInteractionsRunnerUse KitaruGeminiInteractionsRunner when one stable Gemini Interactions
response should be durable. Use the public module kitaru.adapters.gemini and
keep the user-facing adapter name as Gemini Interactions. Treat Antigravity as a
managed-agent/preset use case, not as the core adapter identity.
checkpoint_strategy="interaction" is the supported boundary: one stable
Gemini interaction response becomes one Kitaru checkpoint. Stable statuses are
completed and requires_action.GeminiInteractionRequest.start(...), .resume(...), .function_result(...),
.poll(...), and .antigravity(...) describe the interaction turn. Poll an
existing unfinished interaction by ID instead of creating a duplicate job.requires_action as a handoff back to the Kitaru flow. Run local tool
work or kitaru.wait(...) at flow scope, then send a later
function_result request.cache_identity when project, region, credentials, or client
configuration can change what the same logical request means.GeminiInteractionCapturePolicy before saving raw prompts, provider
payloads, steps, or outputs, because those values can contain user data.my_flow(...) directly instead of my_flow.run(...)wait() inside a checkpointllm() outside a @flowload() becomes ambiguouswait.* override keys in replay (they are not supported)KitaruClient to launch new executions (it's for
inspection/control only)connect(...) and expecting managed workspace support (use
kitaru login for that)create_stack(...) for remote stacks (it's local-only; use
CLI/MCP)wrap(...) for new code instead of
KitaruAgent(...)metadata_only or off instead of
"metadata" or Nonecheckpoint_strategy="calls" to produce one clean
.wait() resultcalls runner call inside your own checkpointInMemorySaver as durable cross-process storageKitaruClaudeRunner to replay Claude-internal Bash,
MCP, custom-tool, hook, permission, or workspace side effects granularlyrequires_action work inside the provider-owned interaction
instead of returning local tool or human work to Kitaru flow scopenpx claudepluginhub zenml-io/kitaru-skills --plugin kitaruGuides creation, editing, and verification of skills for AI coding agents using test-driven development with subagent scenarios. Use when authoring or debugging skills.