# How Langflow's Internal Event System Works: Architecture and Event Types

> Explore Langflow's internal event system, powered by EventManager. Discover its architecture and nine event types, from token streaming to vertex lifecycle management.

- Repository: [Langflow/langflow](https://github.com/langflow-ai/langflow)
- Tags: internals
- Published: 2026-02-24

---

**Langflow implements a lightweight, in-process event bus centered on the `EventManager` class that queues JSON-serialized events via partial callbacks, supporting nine standard event types ranging from token streaming to vertex lifecycle management.**

The `langflow-ai/langflow` repository orchestrates visual AI workflows with real-time feedback capabilities. Its internal event system provides the backbone for streaming LLM tokens, tracking vertex execution states, and pushing updates to connected UI clients. This architecture relies on a minimal, queue-based design that decouples event producers from consumers while maintaining strict typing conventions.

## Core Architecture of the Event System

### The EventManager Class

The foundation resides in [`src/lfx/src/lfx/events/event_manager.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/events/event_manager.py), where the `EventManager` class (lines 28-88) maintains an internal queue and a registry of **partial callbacks** keyed by event names. The manager requires only that the queue object expose a `put_nowait` method, making it compatible with `asyncio.Queue` or custom adapters like the webhook forwarding implementation.

### Event Registration and Validation

Events register through `register_event(name, event_type, callback=None)` (lines 48-65). The method enforces a strict naming convention: all callback names must begin with `on_`. When no custom callback is provided, the manager creates a default partial that forwards to `send_event(event_type, data)`, enabling fire-and-forget emission patterns.

### Event Dispatch Pipeline

The `send_event(event_type, data)` method (lines 66-83) handles serialization using FastAPI's `jsonable_encoder`, constructs a JSON envelope `{"event": event_type, "data": ...}`, generates a UUID-based event ID, and pushes a three-tuple `(event_id, bytes, timestamp)` onto the queue. This structure ensures every event carries a unique identifier and precise timing metadata.

## Standard Event Types and Factory Functions

Langflow defines nine primary event types through factory functions that pre-populate the callback registry.

The `create_default_event_manager(queue=None)` function (lines 91-102) registers the full standard set:

- **on_token** → `token`: Individual LLM token chunks
- **on_vertices_sorted** → `vertices_sorted`: Ordering of vertices after a build
- **on_error** → `error`: Errors raised during execution
- **on_end** → `end`: Finalisation of a run
- **on_message** → `add_message`: UI-level chat/message addition
- **on_remove_message** → `remove_message`: Removal of a chat/message
- **on_end_vertex** → `end_vertex`: Completion of a specific vertex
- **on_build_start** → `build_start`: Beginning of a flow build
- **on_build_end** → `build_end`: End of a flow build

For streaming scenarios, `create_stream_tokens_event_manager(queue=None)` (lines 105-114) registers a minimal subset: `message`, `token`, and `end`, plus ancillary events.

## Webhook Integration and SSE Forwarding

In production deployments, Langflow bridges the internal event bus to Server-Sent Events (SSE) via the webhook service layer in [`src/backend/base/langflow/services/event_manager.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/event_manager.py).

The `WebhookForwardingQueue` class implements the `put_nowait` protocol expected by `EventManager`, extracting the JSON payload from the event tuple and asynchronously emitting it to SSE subscribers (lines 173-190).

The `create_webhook_event_manager(flow_id, run_id)` factory (lines 31-48) constructs an `EventManager` pre-wired with the standard nine event types, forwarding all emissions through `WebhookEventManager.emit` (lines 103-129) to connected clients. This enables real-time browser updates without polling.

## Implementation Examples

Creating a default manager for local queue consumption:

```python
import asyncio
from lfx.events.event_manager import create_default_event_manager

queue = asyncio.Queue()
manager = create_default_event_manager(queue)

# Emit a token event via the generated callback

manager.on_token(data={"chunk": "Hello world"})

```

Registering custom event handlers with user-provided logic:

```python
def log_handler(*, manager, event_type, data):
    print(f"[{event_type}] {data}")

manager.register_event("on_custom", "custom_event", log_handler)
manager.on_custom(data={"status": "processing"})

```

Webhook-enabled manager for SSE streaming to connected clients:

```python
from langflow.services.event_manager import create_webhook_event_manager

webhook_mgr = create_webhook_event_manager(flow_id="flow-123", run_id="run-456")
webhook_mgr.on_message(data={"text": "Streaming response"})

```

## Summary

- Langflow's event system centers on the `EventManager` class in [`src/lfx/src/lfx/events/event_manager.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/events/event_manager.py), which manages partial callbacks and queue-based dispatch.
- Event registration requires `on_` prefixed names and validates through `register_event()`, while `send_event()` handles FastAPI JSON serialization and UUID generation.
- Nine standard event types cover the full vertex lifecycle: token streaming, build phases, messaging, and error handling.
- Two factory functions—`create_default_event_manager()` and `create_stream_tokens_event_manager()`—provide pre-configured managers for different use cases.
- The webhook service layer adapts the event bus to SSE clients via `WebhookForwardingQueue` and `create_webhook_event_manager()` in [`src/backend/base/langflow/services/event_manager.py`](https://github.com/langflow-ai/langflow/blob/main/src/backend/base/langflow/services/event_manager.py).

## Frequently Asked Questions

### What is the primary class responsible for Langflow's event handling?

The `EventManager` class defined in [`src/lfx/src/lfx/events/event_manager.py`](https://github.com/langflow-ai/langflow/blob/main/src/lfx/src/lfx/events/event_manager.py) serves as the core component. It maintains a queue reference and a dictionary of partial callbacks, providing methods for event registration and dispatch.

### How are events serialized before being placed on the queue?

The `send_event()` method uses FastAPI's `jsonable_encoder` to serialize the payload data, then constructs a JSON envelope containing the event type and data. This envelope is converted to bytes and packaged with a UUID-based event ID and timestamp before being pushed via `put_nowait`.

### Can I create custom events beyond the nine standard types?

Yes. The `register_event()` method accepts any callback name starting with `on_` and any event type string. You can provide a custom callable or rely on the default behavior, which forwards to `send_event()` with the specified event type.

### How does Langflow stream events to web browsers?

When running in webhook mode, Langflow uses `create_webhook_event_manager()` to instantiate an `EventManager` paired with a `WebhookForwardingQueue`. This adapter extracts event payloads and forwards them asynchronously to Server-Sent Events (SSE) subscribers, enabling real-time browser updates.