# How to Build Graph-Based Workflows with WorkflowBuilder and Edge Conditions in Agent Framework

> Build graph-based workflows in Agent Framework using WorkflowBuilder and edge conditions. Declaratively connect executor nodes with conditional edges to filter message flow.

- Repository: [Microsoft/agent-framework](https://github.com/microsoft/agent-framework)
- Tags: how-to-guide
- Published: 2026-04-05

---

**The Agent Framework's `WorkflowBuilder` class enables declarative construction of directed execution graphs by connecting executor nodes with conditional edges that filter message flow based on runtime payload evaluation.**

The microsoft/agent-framework models workflows as directed graphs where executors (or agents wrapped as executors) process messages and edges define routing paths. The `WorkflowBuilder` class in [`_workflow_builder.py`](https://github.com/microsoft/agent-framework/blob/main/_workflow_builder.py) serves as the central entry point for assembling these graphs declaratively, allowing you to construct complex data-driven routing without implementing custom execution engines.

## Core WorkflowBuilder API Methods

The `WorkflowBuilder` class provides a fluent interface for constructing workflow graphs. According to the source code in [`_workflow_builder.py`](https://github.com/microsoft/agent-framework/blob/main/_workflow_builder.py), the following methods define the graph topology:

- **`WorkflowBuilder(start_executor=...)`** — Declares the entry-point executor that receives initial input.
- **`add_edge(source, target)`** — Creates a single directed edge between two executors for unconditional message forwarding.
- **`add_edge(source, target, condition=my_predicate)`** — Creates a conditional edge where `my_predicate` filters messages; only payloads evaluating to `True` traverse the edge.
- **`add_fan_out_edges(source, [t1, t2, …])`** — Creates a `FanOutEdgeGroup` broadcasting one source to multiple targets.
- **`add_fan_in_edges([s1, s2, …], target)`** — Creates a `FanInEdgeGroup` merging multiple upstream executors into a single target.
- **`add_switch_case_edge_group(source, [Case(...), Default(...)])`** — Implements switch/case routing where the first matching predicate determines the target.
- **`add_multi_selection_edge_group(source, targets, selection_func)`** — Routes to a subset of targets based on custom selection logic.
- **`add_chain([ex1, ex2, …])`** — Convenience method for building linear chains (`ex1 → ex2 → …`).
- **`build()`** — Validates connectivity, type-compatibility, and duplicate IDs, returning an immutable `Workflow` ready for execution.

## Understanding Edge Conditions

An **EdgeCondition** is a callable with the signature `Callable[[Any], bool | Awaitable[bool]]` defined in [`_edge.py`](https://github.com/microsoft/agent-framework/blob/main/_edge.py). When the workflow engine evaluates routing, it invokes `await edge.should_route(payload)` to determine traversal eligibility.

The `WorkflowBuilder.add_edge()` method automatically wraps supplied conditions, extracts human-readable names via `_extract_function_name`, and stores them in the underlying `Edge` instance. The condition receives the message payload and must return `True` (or an awaitable resolving to `True`) for the edge to activate.

### Serialization Limitations

Edge conditions are **not** serialized with the graph. As implemented in [`_edge.py`](https://github.com/microsoft/agent-framework/blob/main/_edge.py), `Edge.to_dict()` persists only the `condition_name` string. The callable itself is discarded; deserialization inserts a placeholder (`_missing_callable`) that raises `RuntimeError` if invoked. This ensures missing predicates surface immediately at runtime rather than failing silently.

## Practical Implementation Examples

### Simple Linear Workflow

The following example chains two executors: one converts text to uppercase, the other reverses it.

```python
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(text.upper())

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        await ctx.yield_output(text[::-1])

upper = UpperCaseExecutor(id="upper")
rev = ReverseExecutor(id="rev")

workflow = WorkflowBuilder(start_executor=upper).add_edge(upper, rev).build()

```

Input `"hello"` produces `["OLLEH"]` as the reversed, upper-cased string.

### Conditional Edge Routing

Use `add_edge()` with a `condition` parameter to gate message flow based on payload content.

```python
def is_ready(msg):
    return msg.get("ready", False)

class Producer(Executor):
    @handler
    async def produce(self, ctx: WorkflowContext[dict]):
        await ctx.send_message({"payload": 42, "ready": True})

class Consumer(Executor):
    @handler
    async def consume(self, data: dict, ctx: WorkflowContext[Never, str]) -> None:
        await ctx.yield_output(f"Got {data['payload']}")

prod = Producer(id="producer")
cons = Consumer(id="consumer")

wf = (
    WorkflowBuilder(start_executor=prod)
    .add_edge(prod, cons, condition=is_ready)
    .build()
)

```

Messages where `ready` is falsy terminate the workflow early; only truthy payloads reach the consumer.

### Fan-Out with Selection Logic

The `add_multi_selection_edge_group()` method enables dynamic fan-out where a selection function determines which targets receive the message.

```python
def select_high_priority(msg, targets):
    return targets if msg["priority"] == "high" else [targets[0]]

class Dispatcher(Executor):
    @handler
    async def dispatch(self, ctx: WorkflowContext[dict]):
        await ctx.send_message({"job": "run", "priority": "low"})

class WorkerA(Executor):
    @handler
    async def work(self, job, ctx: WorkflowContext):
        print("A handling", job)

class WorkerB(Executor):
    @handler
    async def work(self, job, ctx: WorkflowContext):
        print("B handling", job)

disp = Dispatcher(id="dispatch")
a = WorkerA(id="a")
b = WorkerB(id="b")

wf = (
    WorkflowBuilder(start_executor=disp)
    .add_multi_selection_edge_group(disp, [a, b], selection_func=select_high_priority)
    .build()
)

```

Low-priority messages route only to `WorkerA`; high-priority messages broadcast to both workers.

### Switch-Case Routing Pattern

Implement conditional branching with `add_switch_case_edge_group()`, which evaluates predicates in order until one matches.

```python
from agent_framework import Case, Default

class Classifier(Executor):
    @handler
    async def classify(self, ctx: WorkflowContext[str]):
        await ctx.send_message({"type": "json", "data": ctx.input})

class JsonHandler(Executor):
    @handler
    async def handle(self, payload, ctx: WorkflowContext):
        print("JSON:", payload)

class XmlHandler(Executor):
    @handler
    async def handle(self, payload, ctx: WorkflowContext):
        print("XML:", payload)

class Fallback(Executor):
    @handler
    async def handle(self, payload, ctx: WorkflowContext):
        print("Unknown:", payload)

clf = Classifier(id="clf")
jsonh = JsonHandler(id="json")
xmlh = XmlHandler(id="xml")
fb = Fallback(id="fallback")

wf = (
    WorkflowBuilder(start_executor=clf)
    .add_switch_case_edge_group(
        clf,
        [
            Case(condition=lambda m: m["type"] == "json", target=jsonh),
            Case(condition=lambda m: m["type"] == "xml", target=xmlh),
            Default(target=fb),
        ],
    )
    .build()
)

```

The first matching `Case` routes the message; if none match, the `Default` branch executes.

## Summary

- The **Agent Framework** models workflows as directed graphs of executors connected by edges defined in [`_workflow_builder.py`](https://github.com/microsoft/agent-framework/blob/main/_workflow_builder.py) and [`_edge.py`](https://github.com/microsoft/agent-framework/blob/main/_edge.py).
- **WorkflowBuilder** provides a fluent API for constructing graphs including linear chains, fan-out, fan-in, and switch-case routing.
- **EdgeCondition** callables filter message flow at runtime but are not serialized; only condition names persist in the graph definition.
- Complex routing patterns like multi-selection and conditional branching require no custom engine code when using the built-in builder methods.

## Frequently Asked Questions

### What is the signature for an edge condition callable?

According to [`_edge.py`](https://github.com/microsoft/agent-framework/blob/main/_edge.py), an `EdgeCondition` must implement `Callable[[Any], bool | Awaitable[bool]]`. The callable receives the message payload and returns either a boolean or an awaitable resolving to a boolean that determines edge traversal.

### Can WorkflowBuilder handle cycles in the workflow graph?

While primarily designed for directed acyclic graphs, `WorkflowBuilder` supports conditional cycles where **edge conditions** guard the loop. The underlying execution engine processes these cycles, but you must ensure termination logic within your conditions to prevent infinite loops.

### Why do edge conditions raise RuntimeError after deserialization?

As implemented in [`_edge.py`](https://github.com/microsoft/agent-framework/blob/main/_edge.py), the `build()` process validates the graph but serialization via `Edge.to_dict()` stores only the `condition_name` string, not the callable. During deserialization, a placeholder function `_missing_callable` replaces the actual condition and raises `RuntimeError` if invoked, ensuring undefined predicates fail explicitly rather than silently skipping edges.

### How does fan-in edge grouping handle message aggregation?

The `add_fan_in_edges()` method creates a `FanInEdgeGroup` (defined in [`_edge.py`](https://github.com/microsoft/agent-framework/blob/main/_edge.py)) that converges multiple upstream executors into a single target. The runtime engine manages message buffering and delivery semantics, ensuring upstream messages arrive before the target executor processes them, effectively synchronizing parallel branches.