Pipeline Stages in Headroom and How PipelineExtensionManager Hooks Work

Headroom processes every request through 11 canonical lifecycle stages defined in headroom/pipeline.py, and the PipelineExtensionManager broadcasts PipelineEvent objects to registered extensions via the on_pipeline_event hook protocol, allowing inspection, mutation, or replacement of requests without aborting the pipeline flow.

The chopratejas/headroom repository implements a fixed, extensible request-processing architecture. Understanding the precise order of pipeline stages and the hook mechanism is essential for developers building custom middleware, logging agents, or request transformers that integrate with the Headroom LLM proxy.

The 11 Canonical Pipeline Stages

Headroom defines its processing lifecycle in the PipelineStage enum within headroom/pipeline.py. These stages execute in a strict, stable sequence that never changes without a deprecation cycle, ensuring extensions can rely on specific hook points.

  • SETUP – Initial configuration before any request handling begins.
  • PRE_START – Preparations that run just before the pipeline officially starts.
  • POST_START – Work that occurs immediately after the pipeline has started.
  • INPUT_RECEIVED – The raw request payload has been received from the client.
  • INPUT_CACHED – The request (or parts of it) has been stored in the request-cache.
  • INPUT_ROUTED – Content-routing logic decides which transforms will apply to this request.
  • INPUT_COMPRESSED – The request body undergoes compression (e.g., token-reduction strategies).
  • INPUT_REMEMBERED – Memory-related processing executes, such as retrieval from a vector store.
  • PRE_SEND – The request is about to be dispatched to the downstream LLM or provider.
  • POST_SEND – The request was sent; post-send bookkeeping runs.
  • RESPONSE_RECEIVED – The LLM response arrives and is ready for further processing or forwarding.

These stages form the CANONICAL_PIPELINE_STAGES tuple, providing predictable interception points for observability and modification.

How PipelineExtensionManager Dispatches Hooks

The PipelineExtensionManager class (also in headroom/pipeline.py) serves as the central dispatcher for the pipeline’s extensibility layer. Its primary responsibility is to instantiate PipelineEvent objects and broadcast them to every registered extension at the current stage.

The PipelineEvent Dataclass

Each hook invocation receives a PipelineEvent object containing the full context of the current request:

  • stage – The current PipelineStage enum value.
  • operation – The operation name (e.g., "chat").
  • request_id – Unique identifier for the request.
  • provider and model – Target LLM provider and model identifiers.
  • messages, tools, headers – Payload components being sent to the provider.
  • response – The raw response from the LLM (populated in later stages).
  • metadata – A free-form dictionary for passing data between stages or extensions.

Extension Contract and Discovery

Any object satisfying the PipelineExtension protocol can register to receive events. The contract requires a single method:

def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
    ...

Extensions are discovered through two mechanisms:

  1. Entry-point discovery – The manager calls discover_pipeline_extensions() to load any package registering under the headroom.pipeline_extension entry-point group.
  2. Manual registration – Pass concrete extension instances directly via the hooks parameter when instantiating PipelineExtensionManager.

Hook Execution and Fail-Open Behavior

When manager.emit(stage, …) is called, the manager iterates through all registered extensions in order. The core emission logic from headroom/pipeline.py operates as follows:

event = PipelineEvent(stage=stage, operation=operation, …)
for extension in self._extensions:
    handler = getattr(extension, "on_pipeline_event", None)
    if callable(handler):
        try:
            updated = handler(event)
        except Exception as exc:
            log.warning(
                "pipeline extension %r failed during %s: %s",
                type(extension).__name__, stage.value, exc,
            )
            continue
        if isinstance(updated, PipelineEvent):
            event = updated
return event

This implementation guarantees fail-open behavior: exceptions from extensions are caught, logged, and do not abort the pipeline. If an extension returns a new PipelineEvent instance, that object replaces the current event for all downstream extensions and pipeline stages. Returning None leaves the original event unchanged.

The manager.enabled property is True only when at least one extension is present, allowing the core pipeline to skip the emit loop entirely when no extensions are configured.

Practical Extension Examples

Inspecting Requests with a Logging Extension

The following extension logs the raw message payload whenever the pipeline reaches the INPUT_RECEIVED stage:


# logging_extension.py

import logging
from headroom.pipeline import PipelineEvent, PipelineStage

log = logging.getLogger("headroom.inspector")

class LogInputExtension:
    """Logs request details at the INPUT_RECEIVED stage."""
    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        if event.stage is PipelineStage.INPUT_RECEIVED:
            log.info(
                "INPUT_RECEIVED – request_id=%s messages=%s",
                event.request_id, event.messages
            )
        return None

Registering Extensions Manually

You can instantiate the manager with concrete hooks without using entry-point discovery:


# app.py

from headroom.pipeline import PipelineExtensionManager, PipelineStage
from logging_extension import LogInputExtension

manager = PipelineExtensionManager(hooks=LogInputExtension())

# Simulate emitting an event

event = manager.emit(
    PipelineStage.INPUT_RECEIVED,
    operation="chat",
    request_id="req-456",
    provider="openai",
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello, Headroom"}],
)

Running this code triggers the logger output before the request proceeds to subsequent stages.

Auto-Discovery via Python Entry Points

For distributed packages, declare the extension in setup.cfg to enable automatic loading:

[options.entry_points]
headroom.pipeline_extension =
    my_logger = my_package.module:LogInputExtension

When PipelineExtensionManager(discover=True) is instantiated (the default), it automatically loads LogInputExtension from any installed distribution declaring this entry point.

Mutating Requests and Adding Headers

Extensions can modify the event in-place to inject custom headers before the request reaches the LLM provider:

class HeaderInjectionExtension:
    """Adds a custom header at the PRE_SEND stage."""
    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        if event.stage is PipelineStage.PRE_SEND:
            event.headers = event.headers or {}
            event.headers["x-custom-auth"] = "bearer-token-123"
        return None

Because the manager reuses the same PipelineEvent object, the injected header persists when the request is finally dispatched.

Short-Circuiting the Pipeline

Extensions can bypass the LLM entirely by returning a completely new PipelineEvent at an early stage, effectively jumping to RESPONSE_RECEIVED:

class CacheExtension:
    """Returns a cached response without calling the LLM."""
    def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None:
        if event.stage is PipelineStage.INPUT_RECEIVED:
            cached = self.check_cache(event.messages)
            if cached:
                return PipelineEvent(
                    stage=PipelineStage.RESPONSE_RECEIVED,
                    operation=event.operation,
                    request_id=event.request_id,
                    provider=event.provider,
                    model=event.model,
                    response={"choices": [{"message": {"role": "assistant", "content": cached}}]},
                    metadata={"cached": True},
                )
        return None
    
    def check_cache(self, messages):
        # Cache lookup logic here

        return "Cached reply" if messages else None

Returning a new event aborts the remaining pipeline stages; downstream code treats the event as already completed.

Summary

  • Headroom defines 11 stable pipeline stages in headroom/pipeline.py, ranging from SETUP through RESPONSE_RECEIVED.
  • The PipelineExtensionManager broadcasts PipelineEvent objects to all registered extensions at each stage via the on_pipeline_event hook protocol.
  • Extensions can inspect, mutate, or replace events; returning a new PipelineEvent short-circuits the remaining pipeline.
  • Fail-open design ensures exceptions in extensions do not abort the request flow—they are logged and skipped.
  • Registration supports both manual instantiation and automatic discovery through Python entry points (headroom.pipeline_extension).

Frequently Asked Questions

What happens if a pipeline extension throws an exception?

The PipelineExtensionManager catches the exception, logs a warning message identifying the extension and stage, and continues processing with the current event. This fail-open behavior ensures that a faulty extension cannot crash the LLM request pipeline.

Can extensions modify the pipeline stage order?

No. The 11 canonical stages are defined as a fixed tuple in headroom/pipeline.py and execute in a predetermined sequence. Extensions cannot reorder, skip, or insert new stages; they can only react to the existing stages or replace the event to short-circuit to a later stage like RESPONSE_RECEIVED.

How do I register multiple extensions simultaneously?

Pass a list of extension instances to the hooks parameter when creating the manager, or rely on entry-point discovery to load all registered extensions from the environment automatically. The manager processes extensions in the order they are registered.

Is it possible to disable the PipelineExtensionManager at runtime?

Yes. Check the manager.enabled property, which is False when no extensions are registered. The core pipeline code can skip calling manager.emit() entirely when enabled is False, avoiding any hook overhead for production deployments that do not require extensions.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →