# How Langflow's Flow Execution Engine Handles Asynchronous Task Processing

> Discover how Langflow's flow execution engine uses asyncio for parallel processing and real-time streaming. Learn about its efficient asynchronous task handling.

- Repository: [Langflow/langflow](https://github.com/langflow-ai/langflow)
- Tags: internals
- Published: 2026-02-24

---

**Langflow executes flow components in parallel layers using Python's `asyncio`, streaming real-time results through an async generator while maintaining thread-safe state management.**

Langflow is an open-source visual framework for building AI-driven workflows. Its execution engine, found in the `langflow-ai/langflow` repository, processes complex directed graphs of components (vertices) using a sophisticated asynchronous architecture that maximizes parallelism while preserving dependency order.

## Architecture Overview: Layered Parallelism

The engine organizes vertices into **parallel layers** based on their dependencies. Vertices at the same depth in the graph execute simultaneously as independent `asyncio` tasks, while vertices dependent on upstream outputs wait for their layer to complete.

This design lives in [`src/lfx/src/lfx/graph/graph/base.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/graph/graph/base.py), where the `Graph` class orchestrates the entire process. The core loop uses `asyncio.gather()` to run all tasks in a layer concurrently, then determines the next runnable set based on completed outputs.

## Core Implementation Details

The execution pipeline breaks down into distinct phases, each handled by specific methods in the codebase:

| Step | Location | Functionality |
|------|----------|---------------|
| **Initialize Execution** | [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py) lines 356-401 | `async_start()` prepares the graph, resets output values, and enters the iterative execution loop. |
| **Create Vertex Tasks** | [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py) lines 1650-1700 | `process()` spawns `asyncio.create_task(self.build_vertex(...))` for each runnable vertex, encoding vertex IDs in task names for debugging. |
| **Execute Layer** | [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py) lines 1808-1910 | `_execute_tasks()` awaits all tasks with `asyncio.gather(*tasks, return_exceptions=True)`, capturing exceptions without stopping the loop. |
| **Handle Failures** | [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py) lines 1830-1839 | Checks `isinstance(result, Exception)`, logs errors, aborts remaining tasks, and propagates the exception upward. |
| **Record Results** | [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py) lines 1842-1852 | Stores successful builds in `build_results` and calls `log_vertex_build()` for caching and SSE emission. |
| **Update State** | [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py) lines 1525-1564 | `get_next_runnable_vertices()` acquires an `asyncio.Lock` to ensure thread-safe updates to the run-manager state. |
| **Emit Events** | [`utils.py`](https://github.com/langflow-ai/langflow/blob/main/utils.py) | `emit_vertex_build_event()` sends Server-Sent Events containing vertex results, next vertex IDs, and inactivated vertices. |

## The Async Generator Pattern

The engine exposes results through a **single async generator** pattern. The `async_start` method yields `VertexBuildResult` objects immediately when vertices complete, allowing callers to stream partial outputs while downstream vertices continue processing.

```python

# From src/lfx/src/lfx/graph/graph/base.py

async for result in graph.async_start(event_manager=event_manager):
    if isinstance(result, Finish):
        print("Flow execution complete")
        break
    print(f"Vertex {result.vertex.id} completed with valid={result.valid}")
    print(result.result_dict)

```

This pattern enables real-time UI updates in the Langflow interface and allows API consumers to receive Server-Sent Events without waiting for the entire graph to finish.

## Error Handling and Concurrency Control

The engine implements **graceful degradation** through exception isolation. By passing `return_exceptions=True` to `asyncio.gather()`, individual vertex failures don't crash the entire layer. The `_execute_tasks` method inspects each result:

```python

# Conceptual excerpt from base.py _execute_tasks

completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for result in completed_tasks:
    if isinstance(result, Exception):
        # Log error and cancel remaining tasks

        await self._cancel_tasks(tasks)
        raise result

```

**Thread safety** is maintained through an `asyncio.Lock` acquired in `get_next_runnable_vertices()`. This prevents race conditions when multiple tasks complete simultaneously and attempt to update the shared run-manager state that tracks which vertices are ready to execute.

## CLI Integration and Capture

Langflow's CLI leverages the same engine through `execute_graph_with_capture` in [`src/lfx/src/lfx/cli/common.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/cli/common.py) (lines 296-340). This utility redirects `stdout`/`stderr` during execution, runs `graph.async_start`, and returns both structured results and captured logs:

```python
from lfx.cli.common import execute_graph_with_capture

results, logs = await execute_graph_with_capture(
    graph, 
    input_value="Hello"
)

for r in results:
    print(f"{r.vertex.id}: {r.result_dict}")

```

## Summary

- **Layer-wise parallelism** executes independent vertices concurrently using `asyncio.gather()`, maximizing CPU utilization while respecting dependency chains.
- **Async generator streaming** yields results immediately via `async_start()`, enabling real-time UI updates and Server-Sent Events without blocking.
- **Robust error isolation** uses `return_exceptions=True` to catch vertex failures individually, preventing cascading crashes while maintaining detailed error logs.
- **Thread-safe state management** employs `asyncio.Lock` in `get_next_runnable_vertices()` to coordinate access to the run-manager across concurrent task completions.
- **Modular architecture** separates concerns between task creation (`process`), execution (`_execute_tasks`), and event emission (`emit_vertex_build_event`), all orchestrated from [`src/lfx/src/lfx/graph/graph/base.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/graph/graph/base.py).

## Frequently Asked Questions

### How does Langflow determine which vertices can run in parallel?

The engine analyzes the graph structure to identify vertices at the same dependency depth. In `get_next_runnable_vertices()` (lines 1525-1564 of [`base.py`](https://github.com/langflow-ai/langflow/blob/main/base.py)), it checks which vertices have all their upstream dependencies satisfied. These vertices form a "layer" that gets executed concurrently via `asyncio.gather()`.

### What happens when a vertex fails during async execution?

When `asyncio.gather()` returns results with `return_exceptions=True`, the `_execute_tasks` method (lines 1808-1910) inspects each result. If it detects an `Exception` instance, it immediately cancels remaining tasks in that layer, logs the error details, and propagates the exception upward to halt the flow execution.

### Can I stream partial results while the flow is still running?

Yes. The `async_start()` method implements an async generator pattern that yields `VertexBuildResult` objects immediately when each vertex completes. This allows API endpoints to send Server-Sent Events or CLI tools to print progress without waiting for the entire graph to finish, as demonstrated in the `execute_graph_with_capture` utility.

### How does Langflow prevent race conditions when updating vertex states?

The engine uses an `asyncio.Lock` acquired at the beginning of `get_next_runnable_vertices()` to ensure thread-safe access to the run-manager state. This prevents multiple concurrent tasks from simultaneously modifying the set of runnable vertices when they complete at the same time, maintaining consistent graph execution state.