How Langflow's Flow Execution Engine Handles Asynchronous Task Processing

Langflow executes flow components in parallel layers using Python's asyncio, streaming real-time results through an async generator while maintaining thread-safe state management.

Langflow is an open-source visual framework for building AI-driven workflows. Its execution engine, found in the langflow-ai/langflow repository, processes complex directed graphs of components (vertices) using a sophisticated asynchronous architecture that maximizes parallelism while preserving dependency order.

Architecture Overview: Layered Parallelism

The engine organizes vertices into parallel layers based on their dependencies. Vertices at the same depth in the graph execute simultaneously as independent asyncio tasks, while vertices dependent on upstream outputs wait for their layer to complete.

This design lives in src/lfx/src/lfx/graph/graph/base.py, where the Graph class orchestrates the entire process. The core loop uses asyncio.gather() to run all tasks in a layer concurrently, then determines the next runnable set based on completed outputs.

Core Implementation Details

The execution pipeline breaks down into distinct phases, each handled by specific methods in the codebase:

Step Location Functionality
Initialize Execution base.py lines 356-401 async_start() prepares the graph, resets output values, and enters the iterative execution loop.
Create Vertex Tasks base.py lines 1650-1700 process() spawns asyncio.create_task(self.build_vertex(...)) for each runnable vertex, encoding vertex IDs in task names for debugging.
Execute Layer base.py lines 1808-1910 _execute_tasks() awaits all tasks with asyncio.gather(*tasks, return_exceptions=True), capturing exceptions without stopping the loop.
Handle Failures base.py lines 1830-1839 Checks isinstance(result, Exception), logs errors, aborts remaining tasks, and propagates the exception upward.
Record Results base.py lines 1842-1852 Stores successful builds in build_results and calls log_vertex_build() for caching and SSE emission.
Update State base.py lines 1525-1564 get_next_runnable_vertices() acquires an asyncio.Lock to ensure thread-safe updates to the run-manager state.
Emit Events utils.py emit_vertex_build_event() sends Server-Sent Events containing vertex results, next vertex IDs, and inactivated vertices.

The Async Generator Pattern

The engine exposes results through a single async generator pattern. The async_start method yields VertexBuildResult objects immediately when vertices complete, allowing callers to stream partial outputs while downstream vertices continue processing.


# From src/lfx/src/lfx/graph/graph/base.py

async for result in graph.async_start(event_manager=event_manager):
    if isinstance(result, Finish):
        print("Flow execution complete")
        break
    print(f"Vertex {result.vertex.id} completed with valid={result.valid}")
    print(result.result_dict)

This pattern enables real-time UI updates in the Langflow interface and allows API consumers to receive Server-Sent Events without waiting for the entire graph to finish.

Error Handling and Concurrency Control

The engine implements graceful degradation through exception isolation. By passing return_exceptions=True to asyncio.gather(), individual vertex failures don't crash the entire layer. The _execute_tasks method inspects each result:


# Conceptual excerpt from base.py _execute_tasks

completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for result in completed_tasks:
    if isinstance(result, Exception):
        # Log error and cancel remaining tasks

        await self._cancel_tasks(tasks)
        raise result

Thread safety is maintained through an asyncio.Lock acquired in get_next_runnable_vertices(). This prevents race conditions when multiple tasks complete simultaneously and attempt to update the shared run-manager state that tracks which vertices are ready to execute.

CLI Integration and Capture

Langflow's CLI leverages the same engine through execute_graph_with_capture in src/lfx/src/lfx/cli/common.py (lines 296-340). This utility redirects stdout/stderr during execution, runs graph.async_start, and returns both structured results and captured logs:

from lfx.cli.common import execute_graph_with_capture

results, logs = await execute_graph_with_capture(
    graph, 
    input_value="Hello"
)

for r in results:
    print(f"{r.vertex.id}: {r.result_dict}")

Summary

  • Layer-wise parallelism executes independent vertices concurrently using asyncio.gather(), maximizing CPU utilization while respecting dependency chains.
  • Async generator streaming yields results immediately via async_start(), enabling real-time UI updates and Server-Sent Events without blocking.
  • Robust error isolation uses return_exceptions=True to catch vertex failures individually, preventing cascading crashes while maintaining detailed error logs.
  • Thread-safe state management employs asyncio.Lock in get_next_runnable_vertices() to coordinate access to the run-manager across concurrent task completions.
  • Modular architecture separates concerns between task creation (process), execution (_execute_tasks), and event emission (emit_vertex_build_event), all orchestrated from src/lfx/src/lfx/graph/graph/base.py.

Frequently Asked Questions

How does Langflow determine which vertices can run in parallel?

The engine analyzes the graph structure to identify vertices at the same dependency depth. In get_next_runnable_vertices() (lines 1525-1564 of base.py), it checks which vertices have all their upstream dependencies satisfied. These vertices form a "layer" that gets executed concurrently via asyncio.gather().

What happens when a vertex fails during async execution?

When asyncio.gather() returns results with return_exceptions=True, the _execute_tasks method (lines 1808-1910) inspects each result. If it detects an Exception instance, it immediately cancels remaining tasks in that layer, logs the error details, and propagates the exception upward to halt the flow execution.

Can I stream partial results while the flow is still running?

Yes. The async_start() method implements an async generator pattern that yields VertexBuildResult objects immediately when each vertex completes. This allows API endpoints to send Server-Sent Events or CLI tools to print progress without waiting for the entire graph to finish, as demonstrated in the execute_graph_with_capture utility.

How does Langflow prevent race conditions when updating vertex states?

The engine uses an asyncio.Lock acquired at the beginning of get_next_runnable_vertices() to ensure thread-safe access to the run-manager state. This prevents multiple concurrent tasks from simultaneously modifying the set of runnable vertices when they complete at the same time, maintaining consistent graph execution state.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →