How Langflow's Internal Event System Works: Architecture and Event Types

Langflow implements a lightweight, in-process event bus centered on the EventManager class that queues JSON-serialized events via partial callbacks, supporting nine standard event types ranging from token streaming to vertex lifecycle management.

The langflow-ai/langflow repository orchestrates visual AI workflows with real-time feedback capabilities. Its internal event system provides the backbone for streaming LLM tokens, tracking vertex execution states, and pushing updates to connected UI clients. This architecture relies on a minimal, queue-based design that decouples event producers from consumers while maintaining strict typing conventions.

Core Architecture of the Event System

The EventManager Class

The foundation resides in src/lfx/src/lfx/events/event_manager.py, where the EventManager class (lines 28-88) maintains an internal queue and a registry of partial callbacks keyed by event names. The manager requires only that the queue object expose a put_nowait method, making it compatible with asyncio.Queue or custom adapters like the webhook forwarding implementation.

Event Registration and Validation

Events register through register_event(name, event_type, callback=None) (lines 48-65). The method enforces a strict naming convention: all callback names must begin with on_. When no custom callback is provided, the manager creates a default partial that forwards to send_event(event_type, data), enabling fire-and-forget emission patterns.

Event Dispatch Pipeline

The send_event(event_type, data) method (lines 66-83) handles serialization using FastAPI's jsonable_encoder, constructs a JSON envelope {"event": event_type, "data": ...}, generates a UUID-based event ID, and pushes a three-tuple (event_id, bytes, timestamp) onto the queue. This structure ensures every event carries a unique identifier and precise timing metadata.

Standard Event Types and Factory Functions

Langflow defines nine primary event types through factory functions that pre-populate the callback registry.

The create_default_event_manager(queue=None) function (lines 91-102) registers the full standard set:

  • on_tokentoken: Individual LLM token chunks
  • on_vertices_sortedvertices_sorted: Ordering of vertices after a build
  • on_errorerror: Errors raised during execution
  • on_endend: Finalisation of a run
  • on_messageadd_message: UI-level chat/message addition
  • on_remove_messageremove_message: Removal of a chat/message
  • on_end_vertexend_vertex: Completion of a specific vertex
  • on_build_startbuild_start: Beginning of a flow build
  • on_build_endbuild_end: End of a flow build

For streaming scenarios, create_stream_tokens_event_manager(queue=None) (lines 105-114) registers a minimal subset: message, token, and end, plus ancillary events.

Webhook Integration and SSE Forwarding

In production deployments, Langflow bridges the internal event bus to Server-Sent Events (SSE) via the webhook service layer in src/backend/base/langflow/services/event_manager.py.

The WebhookForwardingQueue class implements the put_nowait protocol expected by EventManager, extracting the JSON payload from the event tuple and asynchronously emitting it to SSE subscribers (lines 173-190).

The create_webhook_event_manager(flow_id, run_id) factory (lines 31-48) constructs an EventManager pre-wired with the standard nine event types, forwarding all emissions through WebhookEventManager.emit (lines 103-129) to connected clients. This enables real-time browser updates without polling.

Implementation Examples

Creating a default manager for local queue consumption:

import asyncio
from lfx.events.event_manager import create_default_event_manager

queue = asyncio.Queue()
manager = create_default_event_manager(queue)

# Emit a token event via the generated callback

manager.on_token(data={"chunk": "Hello world"})

Registering custom event handlers with user-provided logic:

def log_handler(*, manager, event_type, data):
    print(f"[{event_type}] {data}")

manager.register_event("on_custom", "custom_event", log_handler)
manager.on_custom(data={"status": "processing"})

Webhook-enabled manager for SSE streaming to connected clients:

from langflow.services.event_manager import create_webhook_event_manager

webhook_mgr = create_webhook_event_manager(flow_id="flow-123", run_id="run-456")
webhook_mgr.on_message(data={"text": "Streaming response"})

Summary

  • Langflow's event system centers on the EventManager class in src/lfx/src/lfx/events/event_manager.py, which manages partial callbacks and queue-based dispatch.
  • Event registration requires on_ prefixed names and validates through register_event(), while send_event() handles FastAPI JSON serialization and UUID generation.
  • Nine standard event types cover the full vertex lifecycle: token streaming, build phases, messaging, and error handling.
  • Two factory functions—create_default_event_manager() and create_stream_tokens_event_manager()—provide pre-configured managers for different use cases.
  • The webhook service layer adapts the event bus to SSE clients via WebhookForwardingQueue and create_webhook_event_manager() in src/backend/base/langflow/services/event_manager.py.

Frequently Asked Questions

What is the primary class responsible for Langflow's event handling?

The EventManager class defined in src/lfx/src/lfx/events/event_manager.py serves as the core component. It maintains a queue reference and a dictionary of partial callbacks, providing methods for event registration and dispatch.

How are events serialized before being placed on the queue?

The send_event() method uses FastAPI's jsonable_encoder to serialize the payload data, then constructs a JSON envelope containing the event type and data. This envelope is converted to bytes and packaged with a UUID-based event ID and timestamp before being pushed via put_nowait.

Can I create custom events beyond the nine standard types?

Yes. The register_event() method accepts any callback name starting with on_ and any event type string. You can provide a custom callable or rely on the default behavior, which forwards to send_event() with the specified event type.

How does Langflow stream events to web browsers?

When running in webhook mode, Langflow uses create_webhook_event_manager() to instantiate an EventManager paired with a WebhookForwardingQueue. This adapter extracts event payloads and forwards them asynchronously to Server-Sent Events (SSE) subscribers, enabling real-time browser updates.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →