How to Build Graph-Based Workflows with WorkflowBuilder and Edge Conditions in Agent Framework
The Agent Framework's WorkflowBuilder class enables declarative construction of directed execution graphs by connecting executor nodes with conditional edges that filter message flow based on runtime payload evaluation.
The microsoft/agent-framework models workflows as directed graphs where executors (or agents wrapped as executors) process messages and edges define routing paths. The WorkflowBuilder class in _workflow_builder.py serves as the central entry point for assembling these graphs declaratively, allowing you to construct complex data-driven routing without implementing custom execution engines.
Core WorkflowBuilder API Methods
The WorkflowBuilder class provides a fluent interface for constructing workflow graphs. According to the source code in _workflow_builder.py, the following methods define the graph topology:
WorkflowBuilder(start_executor=...)— Declares the entry-point executor that receives initial input.add_edge(source, target)— Creates a single directed edge between two executors for unconditional message forwarding.add_edge(source, target, condition=my_predicate)— Creates a conditional edge wheremy_predicatefilters messages; only payloads evaluating toTruetraverse the edge.add_fan_out_edges(source, [t1, t2, …])— Creates aFanOutEdgeGroupbroadcasting one source to multiple targets.add_fan_in_edges([s1, s2, …], target)— Creates aFanInEdgeGroupmerging multiple upstream executors into a single target.add_switch_case_edge_group(source, [Case(...), Default(...)])— Implements switch/case routing where the first matching predicate determines the target.add_multi_selection_edge_group(source, targets, selection_func)— Routes to a subset of targets based on custom selection logic.add_chain([ex1, ex2, …])— Convenience method for building linear chains (ex1 → ex2 → …).build()— Validates connectivity, type-compatibility, and duplicate IDs, returning an immutableWorkflowready for execution.
Understanding Edge Conditions
An EdgeCondition is a callable with the signature Callable[[Any], bool | Awaitable[bool]] defined in _edge.py. When the workflow engine evaluates routing, it invokes await edge.should_route(payload) to determine traversal eligibility.
The WorkflowBuilder.add_edge() method automatically wraps supplied conditions, extracts human-readable names via _extract_function_name, and stores them in the underlying Edge instance. The condition receives the message payload and must return True (or an awaitable resolving to True) for the edge to activate.
Serialization Limitations
Edge conditions are not serialized with the graph. As implemented in _edge.py, Edge.to_dict() persists only the condition_name string. The callable itself is discarded; deserialization inserts a placeholder (_missing_callable) that raises RuntimeError if invoked. This ensures missing predicates surface immediately at runtime rather than failing silently.
Practical Implementation Examples
Simple Linear Workflow
The following example chains two executors: one converts text to uppercase, the other reverses it.
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(text.upper())
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(text[::-1])
upper = UpperCaseExecutor(id="upper")
rev = ReverseExecutor(id="rev")
workflow = WorkflowBuilder(start_executor=upper).add_edge(upper, rev).build()
Input "hello" produces ["OLLEH"] as the reversed, upper-cased string.
Conditional Edge Routing
Use add_edge() with a condition parameter to gate message flow based on payload content.
def is_ready(msg):
return msg.get("ready", False)
class Producer(Executor):
@handler
async def produce(self, ctx: WorkflowContext[dict]):
await ctx.send_message({"payload": 42, "ready": True})
class Consumer(Executor):
@handler
async def consume(self, data: dict, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Got {data['payload']}")
prod = Producer(id="producer")
cons = Consumer(id="consumer")
wf = (
WorkflowBuilder(start_executor=prod)
.add_edge(prod, cons, condition=is_ready)
.build()
)
Messages where ready is falsy terminate the workflow early; only truthy payloads reach the consumer.
Fan-Out with Selection Logic
The add_multi_selection_edge_group() method enables dynamic fan-out where a selection function determines which targets receive the message.
def select_high_priority(msg, targets):
return targets if msg["priority"] == "high" else [targets[0]]
class Dispatcher(Executor):
@handler
async def dispatch(self, ctx: WorkflowContext[dict]):
await ctx.send_message({"job": "run", "priority": "low"})
class WorkerA(Executor):
@handler
async def work(self, job, ctx: WorkflowContext):
print("A handling", job)
class WorkerB(Executor):
@handler
async def work(self, job, ctx: WorkflowContext):
print("B handling", job)
disp = Dispatcher(id="dispatch")
a = WorkerA(id="a")
b = WorkerB(id="b")
wf = (
WorkflowBuilder(start_executor=disp)
.add_multi_selection_edge_group(disp, [a, b], selection_func=select_high_priority)
.build()
)
Low-priority messages route only to WorkerA; high-priority messages broadcast to both workers.
Switch-Case Routing Pattern
Implement conditional branching with add_switch_case_edge_group(), which evaluates predicates in order until one matches.
from agent_framework import Case, Default
class Classifier(Executor):
@handler
async def classify(self, ctx: WorkflowContext[str]):
await ctx.send_message({"type": "json", "data": ctx.input})
class JsonHandler(Executor):
@handler
async def handle(self, payload, ctx: WorkflowContext):
print("JSON:", payload)
class XmlHandler(Executor):
@handler
async def handle(self, payload, ctx: WorkflowContext):
print("XML:", payload)
class Fallback(Executor):
@handler
async def handle(self, payload, ctx: WorkflowContext):
print("Unknown:", payload)
clf = Classifier(id="clf")
jsonh = JsonHandler(id="json")
xmlh = XmlHandler(id="xml")
fb = Fallback(id="fallback")
wf = (
WorkflowBuilder(start_executor=clf)
.add_switch_case_edge_group(
clf,
[
Case(condition=lambda m: m["type"] == "json", target=jsonh),
Case(condition=lambda m: m["type"] == "xml", target=xmlh),
Default(target=fb),
],
)
.build()
)
The first matching Case routes the message; if none match, the Default branch executes.
Summary
- The Agent Framework models workflows as directed graphs of executors connected by edges defined in
_workflow_builder.pyand_edge.py. - WorkflowBuilder provides a fluent API for constructing graphs including linear chains, fan-out, fan-in, and switch-case routing.
- EdgeCondition callables filter message flow at runtime but are not serialized; only condition names persist in the graph definition.
- Complex routing patterns like multi-selection and conditional branching require no custom engine code when using the built-in builder methods.
Frequently Asked Questions
What is the signature for an edge condition callable?
According to _edge.py, an EdgeCondition must implement Callable[[Any], bool | Awaitable[bool]]. The callable receives the message payload and returns either a boolean or an awaitable resolving to a boolean that determines edge traversal.
Can WorkflowBuilder handle cycles in the workflow graph?
While primarily designed for directed acyclic graphs, WorkflowBuilder supports conditional cycles where edge conditions guard the loop. The underlying execution engine processes these cycles, but you must ensure termination logic within your conditions to prevent infinite loops.
Why do edge conditions raise RuntimeError after deserialization?
As implemented in _edge.py, the build() process validates the graph but serialization via Edge.to_dict() stores only the condition_name string, not the callable. During deserialization, a placeholder function _missing_callable replaces the actual condition and raises RuntimeError if invoked, ensuring undefined predicates fail explicitly rather than silently skipping edges.
How does fan-in edge grouping handle message aggregation?
The add_fan_in_edges() method creates a FanInEdgeGroup (defined in _edge.py) that converges multiple upstream executors into a single target. The runtime engine manages message buffering and delivery semantics, ensuring upstream messages arrive before the target executor processes them, effectively synchronizing parallel branches.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →