Implementing SagaOrchestrator for Reversible Agent Operations in the Agent Governance Toolkit
The Agent Governance Toolkit provides a lightweight SagaOrchestrator that executes multi-step agent workflows sequentially and automatically rolls back committed steps via compensation APIs when failures occur.
The microsoft/agent-governance-toolkit includes a production-ready implementation of the Saga pattern specifically designed for orchestrating reversible AI agent operations. This SagaOrchestrator enables developers to compose complex, distributed workflows where each step can be undone if subsequent operations fail, maintaining system consistency across autonomous agent networks.
Core Components of the SagaOrchestrator
The implementation resides in the agent-hypervisor package and consists of three primary components that work together to manage distributed transactions.
SagaOrchestrator Class
The SagaOrchestrator class in agent-hypervisor/src/hypervisor/saga/orchestrator.py serves as the primary entry point for saga management. It provides methods to create sagas, register steps, execute workflows, and trigger compensation sequences. The orchestrator maintains an in-memory registry of active sagas and enforces sequential execution semantics while handling retries and timeouts through asyncio primitives.
State Machine Definitions
The agent-hypervisor/src/hypervisor/saga/state_machine.py file defines the finite-state machines governing both saga and step-level transitions. The Saga class tracks the overall workflow state (RUNNING, COMPENSATING, COMPLETED, ESCALATED), while the SagaStep class manages individual step states (PENDING, EXECUTING, COMMITTED, FAILED). These classes enforce valid state transitions and record execution metadata including started_at, completed_at, and retry_count timestamps for full observability.
Configuration Constants
Default thresholds for retry logic and timeouts are centralized in agent-hypervisor/src/hypervisor/constants.py. The orchestrator references DEFAULT_RETRY_DELAY_SECONDS to implement exponential back-off strategies and respects per-step max_retries configurations when transient failures occur.
Saga Lifecycle and State Management
A saga begins in the RUNNING state when created via SagaOrchestrator.create_saga(session_id). As the orchestrator successfully commits steps, they transition to COMMITTED status and populate the saga's committed_steps_reversed list.
If any step transitions to FAILED, the orchestrator immediately shifts the saga to COMPENSATING status. The system then iterates through committed_steps_reversed in reverse chronological order, invoking each step's compensation handler. Successful compensation results in a COMPLETED state, while compensation failures escalate the saga to ESCALATED and record joint-liability errors for manual intervention.
Step Execution and Compensation
Each SagaStep moves through a strict lifecycle: PENDING → EXECUTING → COMMITTED upon success, or directly to FAILED on error or timeout. Steps optionally specify an undo_api parameter during registration. If provided, the orchestrator invokes this API during compensation; if absent, the step transitions to COMPENSATION_FAILED and forces saga escalation.
The SagaOrchestrator.execute_step() method wraps executor functions with retry logic and timeout handling. Developers provide async executor callables that implement the actual agent communication, while the orchestrator manages state transitions and persistence.
Timeout Handling and Retries
Execution and compensation routines are wrapped in asyncio.wait_for() using thresholds defined in the constants module. The timeout mechanism uses cooperative multitasking—if a coroutine remains CPU-bound without yielding control, the timeout will only fire when the coroutine eventually yields. For hard-kill semantics on blocking operations, callers should execute the orchestrator in a separate thread or process.
Retried steps respect the per-step max_retries configuration and implement exponential back-off based on DEFAULT_RETRY_DELAY_SECONDS. Each retry attempt increments the step's retry_count attribute for telemetry purposes.
Complete Implementation Example
The following example demonstrates creating a saga, registering steps with undo APIs, executing the workflow, and handling failures that trigger automatic compensation:
import asyncio
from hypervisor.saga.orchestrator import SagaOrchestrator
from hypervisor.saga.state_machine import StepState, SagaState
# ----------------------------------------------------------------------
# 1️⃣ Initialise the orchestrator
# ----------------------------------------------------------------------
orch = SagaOrchestrator()
# ----------------------------------------------------------------------
# 2️⃣ Create a saga for a particular session (e.g., a user request)
# ----------------------------------------------------------------------
saga = orch.create_saga(session_id="order-123")
print(f"Created saga {saga.saga_id}")
# ----------------------------------------------------------------------
# 3️⃣ Register steps – each step knows its execute and undo API names
# ----------------------------------------------------------------------
# Step A – charge credit card
step_a = orch.add_step(
saga_id=saga.saga_id,
action_id="charge-card",
agent_did="did:example:payment-gateway",
execute_api="payment.charge",
undo_api="payment.refund", # compensation
)
# Step B – reserve inventory
step_b = orch.add_step(
saga_id=saga.saga_id,
action_id="reserve-inventory",
agent_did="did:example:inventory",
execute_api="inventory.reserve",
undo_api="inventory.release",
)
# ----------------------------------------------------------------------
# 4️⃣ Define async executors that call the real APIs (mocked here)
# ----------------------------------------------------------------------
async def exec_step(step):
# In a real system you would invoke the appropriate agent via RPC.
# Here we just simulate success/failure based on step.action_id.
if step.action_id == "charge-card":
# Simulate a failure to demonstrate compensation
raise RuntimeError("Insufficient funds")
return f"{step.action_id} succeeded"
async def compensator(step):
# Simulate calling the undo API; return a simple message.
return f"{step.undo_api} called for {step.action_id}"
# ----------------------------------------------------------------------
# 5️⃣ Execute steps sequentially
# ----------------------------------------------------------------------
async def run_saga():
try:
# Execute step A
await orch.execute_step(saga.saga_id, step_a.step_id,
lambda: exec_step(step_a))
# Execute step B (won’t be reached because step A fails)
await orch.execute_step(saga.saga_id, step_b.step_id,
lambda: exec_step(step_b))
except Exception as exc:
print(f"Step failed: {exc}")
# Trigger compensation for all committed steps
failed_comp = await orch.compensate(saga.saga_id, compensator)
if failed_comp:
print(f"Compensation failures: {[s.step_id for s in failed_comp]}")
else:
print("All compensations succeeded")
# Inspect final saga state
final = orch.get_saga(saga.saga_id)
print(f"Saga final state: {final.state.value}")
# Run the demo
asyncio.run(run_saga())
This implementation demonstrates how orch.create_saga generates a unique saga_id, how add_step registers executable actions with their corresponding undo APIs, and how execute_step manages the execution lifecycle. When the first step raises an exception, the orchestrator captures the error and transitions the saga to COMPENSATING, invoking the compensator callable for each committed step via the compensate method.
Key Source Files and References
agent-hypervisor/src/hypervisor/saga/orchestrator.py: Core orchestrator implementation containingcreate_saga,add_step,execute_step, andcompensatemethods.agent-hypervisor/src/hypervisor/saga/state_machine.py: Definitions forSaga,SagaStep,SagaState, andStepStatewith transition validation logic.agent-hypervisor/src/hypervisor/constants.py: Default configuration values including retry delays and timeout thresholds.docs/tutorials/11-saga-orchestration.md: High-level developer tutorial covering saga concepts and API usage patterns.agent-hypervisor/tutorials/saga-compensation/demo.py: End-to-end demonstration of a travel-booking saga with simulated payment failures.
Summary
- The SagaOrchestrator in
orchestrator.pyprovides a declarative API for creating reversible multi-step agent workflows. - State machines enforce valid transitions between
RUNNING,COMMITTED,COMPENSATING, andESCALATEDstates while recording execution metadata. - Compensation proceeds in reverse order through
committed_steps_reversed, invoking optionalundo_apihandlers for each step. - Retry logic respects per-step
max_retriesand uses exponential back-off based onDEFAULT_RETRY_DELAY_SECONDS. - Timeout handling uses
asyncio.wait_for(), though CPU-bound operations require separate thread management for hard-kill semantics. - Full observability is available through state objects that integrate with OpenTelemetry via the hypervisor's
SagaSpanExporter.
Frequently Asked Questions
What happens if a compensation step fails during rollback?
If a compensation step fails, the orchestrator marks that specific step as COMPENSATION_FAILED and transitions the entire saga to the ESCALATED state. The system records a joint-liability error containing the step ID and failure details, requiring manual intervention since the system cannot automatically resolve partial compensation failures.
How does the SagaOrchestrator handle timeouts?
The orchestrator wraps all execution and compensation coroutines in asyncio.wait_for() using configurable timeouts from the constants module. However, this cooperative multitasking approach only works when coroutines yield control; CPU-bound operations that block the event loop will delay timeout detection. For critical timeouts, run the orchestrator in a dedicated thread or process.
Can I implement a SagaOrchestrator workflow without defining undo APIs?
Yes, but steps without an undo_api parameter cannot be compensated. If such a step has been committed when a subsequent failure occurs, the orchestrator marks it as COMPENSATION_FAILED and immediately escalates the saga to ESCALATED status. You should only omit undo_api for idempotent operations or when system consistency can tolerate unrecoverable states.
How do I monitor saga execution progress?
The Saga and SagaStep objects expose state, error, started_at, completed_at, and retry_count attributes that reflect real-time execution status. These objects integrate with the hypervisor's OpenTelemetry SagaSpanExporter for distributed tracing, allowing you to track saga progress through your observability platform by inspecting span attributes and events.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →