# Pipeline Stages in Headroom and How PipelineExtensionManager Hooks Work

> Understand Headroom pipeline stages and how PipelineExtensionManager hooks enable request inspection and mutation without aborting the flow. Explore the 11 canonical lifecycle stages.

- Repository: [Tejas Chopra/headroom](https://github.com/chopratejas/headroom)
- Tags: internals
- Published: 2026-06-06

---

**Headroom processes every request through 11 canonical lifecycle stages defined in [`headroom/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/pipeline.py), and the `PipelineExtensionManager` broadcasts `PipelineEvent` objects to registered extensions via the `on_pipeline_event` hook protocol, allowing inspection, mutation, or replacement of requests without aborting the pipeline flow.**

The `chopratejas/headroom` repository implements a fixed, extensible request-processing architecture. Understanding the precise order of pipeline stages and the hook mechanism is essential for developers building custom middleware, logging agents, or request transformers that integrate with the Headroom LLM proxy.

## The 11 Canonical Pipeline Stages

Headroom defines its processing lifecycle in the `PipelineStage` enum within [`headroom/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/pipeline.py). These stages execute in a strict, stable sequence that never changes without a deprecation cycle, ensuring extensions can rely on specific hook points.

- **SETUP** – Initial configuration before any request handling begins.
- **PRE_START** – Preparations that run just before the pipeline officially starts.
- **POST_START** – Work that occurs immediately after the pipeline has started.
- **INPUT_RECEIVED** – The raw request payload has been received from the client.
- **INPUT_CACHED** – The request (or parts of it) has been stored in the request-cache.
- **INPUT_ROUTED** – Content-routing logic decides which transforms will apply to this request.
- **INPUT_COMPRESSED** – The request body undergoes compression (e.g., token-reduction strategies).
- **INPUT_REMEMBERED** – Memory-related processing executes, such as retrieval from a vector store.
- **PRE_SEND** – The request is about to be dispatched to the downstream LLM or provider.
- **POST_SEND** – The request was sent; post-send bookkeeping runs.
- **RESPONSE_RECEIVED** – The LLM response arrives and is ready for further processing or forwarding.

These stages form the `CANONICAL_PIPELINE_STAGES` tuple, providing predictable interception points for observability and modification.

## How PipelineExtensionManager Dispatches Hooks

The `PipelineExtensionManager` class (also in [`headroom/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/pipeline.py)) serves as the central dispatcher for the pipeline’s extensibility layer. Its primary responsibility is to instantiate `PipelineEvent` objects and broadcast them to every registered extension at the current stage.

### The PipelineEvent Dataclass

Each hook invocation receives a `PipelineEvent` object containing the full context of the current request:

- **stage** – The current `PipelineStage` enum value.
- **operation** – The operation name (e.g., "chat").
- **request_id** – Unique identifier for the request.
- **provider** and **model** – Target LLM provider and model identifiers.
- **messages**, **tools**, **headers** – Payload components being sent to the provider.
- **response** – The raw response from the LLM (populated in later stages).
- **metadata** – A free-form dictionary for passing data between stages or extensions.

### Extension Contract and Discovery

Any object satisfying the `PipelineExtension` protocol can register to receive events. The contract requires a single method:

```python
def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
    ...

```

Extensions are discovered through two mechanisms:

1. **Entry-point discovery** – The manager calls `discover_pipeline_extensions()` to load any package registering under the `headroom.pipeline_extension` entry-point group.
2. **Manual registration** – Pass concrete extension instances directly via the `hooks` parameter when instantiating `PipelineExtensionManager`.

### Hook Execution and Fail-Open Behavior

When `manager.emit(stage, …)` is called, the manager iterates through all registered extensions in order. The core emission logic from [`headroom/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/pipeline.py) operates as follows:

```python
event = PipelineEvent(stage=stage, operation=operation, …)
for extension in self._extensions:
    handler = getattr(extension, "on_pipeline_event", None)
    if callable(handler):
        try:
            updated = handler(event)
        except Exception as exc:
            log.warning(
                "pipeline extension %r failed during %s: %s",
                type(extension).__name__, stage.value, exc,
            )
            continue
        if isinstance(updated, PipelineEvent):
            event = updated
return event

```

This implementation guarantees **fail-open behavior**: exceptions from extensions are caught, logged, and do not abort the pipeline. If an extension returns a new `PipelineEvent` instance, that object replaces the current event for all downstream extensions and pipeline stages. Returning `None` leaves the original event unchanged.

The `manager.enabled` property is `True` only when at least one extension is present, allowing the core pipeline to skip the emit loop entirely when no extensions are configured.

## Practical Extension Examples

### Inspecting Requests with a Logging Extension

The following extension logs the raw message payload whenever the pipeline reaches the `INPUT_RECEIVED` stage:

```python

# logging_extension.py

import logging
from headroom.pipeline import PipelineEvent, PipelineStage

log = logging.getLogger("headroom.inspector")

class LogInputExtension:
    """Logs request details at the INPUT_RECEIVED stage."""
    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        if event.stage is PipelineStage.INPUT_RECEIVED:
            log.info(
                "INPUT_RECEIVED – request_id=%s messages=%s",
                event.request_id, event.messages
            )
        return None

```

### Registering Extensions Manually

You can instantiate the manager with concrete hooks without using entry-point discovery:

```python

# app.py

from headroom.pipeline import PipelineExtensionManager, PipelineStage
from logging_extension import LogInputExtension

manager = PipelineExtensionManager(hooks=LogInputExtension())

# Simulate emitting an event

event = manager.emit(
    PipelineStage.INPUT_RECEIVED,
    operation="chat",
    request_id="req-456",
    provider="openai",
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello, Headroom"}],
)

```

Running this code triggers the logger output before the request proceeds to subsequent stages.

### Auto-Discovery via Python Entry Points

For distributed packages, declare the extension in [`setup.cfg`](https://github.com/chopratejas/headroom/blob/main/setup.cfg) to enable automatic loading:

```ini
[options.entry_points]
headroom.pipeline_extension =
    my_logger = my_package.module:LogInputExtension

```

When `PipelineExtensionManager(discover=True)` is instantiated (the default), it automatically loads `LogInputExtension` from any installed distribution declaring this entry point.

### Mutating Requests and Adding Headers

Extensions can modify the event in-place to inject custom headers before the request reaches the LLM provider:

```python
class HeaderInjectionExtension:
    """Adds a custom header at the PRE_SEND stage."""
    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        if event.stage is PipelineStage.PRE_SEND:
            event.headers = event.headers or {}
            event.headers["x-custom-auth"] = "bearer-token-123"
        return None

```

Because the manager reuses the same `PipelineEvent` object, the injected header persists when the request is finally dispatched.

### Short-Circuiting the Pipeline

Extensions can bypass the LLM entirely by returning a completely new `PipelineEvent` at an early stage, effectively jumping to `RESPONSE_RECEIVED`:

```python
class CacheExtension:
    """Returns a cached response without calling the LLM."""
    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        if event.stage is PipelineStage.INPUT_RECEIVED:
            cached = self.check_cache(event.messages)
            if cached:
                return PipelineEvent(
                    stage=PipelineStage.RESPONSE_RECEIVED,
                    operation=event.operation,
                    request_id=event.request_id,
                    provider=event.provider,
                    model=event.model,
                    response={"choices": [{"message": {"role": "assistant", "content": cached}}]},
                    metadata={"cached": True},
                )
        return None
    
    def check_cache(self, messages):
        # Cache lookup logic here

        return "Cached reply" if messages else None

```

Returning a new event aborts the remaining pipeline stages; downstream code treats the event as already completed.

## Summary

- Headroom defines **11 stable pipeline stages** in [`headroom/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/pipeline.py), ranging from `SETUP` through `RESPONSE_RECEIVED`.
- The **PipelineExtensionManager** broadcasts `PipelineEvent` objects to all registered extensions at each stage via the `on_pipeline_event` hook protocol.
- Extensions can **inspect**, **mutate**, or **replace** events; returning a new `PipelineEvent` short-circuits the remaining pipeline.
- Fail-open design ensures **exceptions in extensions do not abort** the request flow—they are logged and skipped.
- Registration supports both **manual instantiation** and **automatic discovery** through Python entry points (`headroom.pipeline_extension`).

## Frequently Asked Questions

### What happens if a pipeline extension throws an exception?

The `PipelineExtensionManager` catches the exception, logs a warning message identifying the extension and stage, and continues processing with the current event. This fail-open behavior ensures that a faulty extension cannot crash the LLM request pipeline.

### Can extensions modify the pipeline stage order?

No. The 11 canonical stages are defined as a fixed tuple in [`headroom/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/pipeline.py) and execute in a predetermined sequence. Extensions cannot reorder, skip, or insert new stages; they can only react to the existing stages or replace the event to short-circuit to a later stage like `RESPONSE_RECEIVED`.

### How do I register multiple extensions simultaneously?

Pass a list of extension instances to the `hooks` parameter when creating the manager, or rely on entry-point discovery to load all registered extensions from the environment automatically. The manager processes extensions in the order they are registered.

### Is it possible to disable the PipelineExtensionManager at runtime?

Yes. Check the `manager.enabled` property, which is `False` when no extensions are registered. The core pipeline code can skip calling `manager.emit()` entirely when `enabled` is `False`, avoiding any hook overhead for production deployments that do not require extensions.