# Headroom Transform Pipeline Architecture: How It Works and How to Extend It

> Explore Headroom's transform pipeline architecture for LLM requests. Learn how it compresses and normalizes tokens using a modular approach and discover how to extend its capabilities.

- Repository: [Tejas Chopra/headroom](https://github.com/chopratejas/headroom)
- Tags: architecture
- Published: 2026-06-09

---

**Headroom processes every LLM request through a deterministic, provider-agnostic transform pipeline that applies modular compression and normalization transforms—using the `Transform` base class and `TransformPipeline` orchestrator—to reduce token usage while preserving semantic information.**

Headroom is an open-source LLM optimization library that intercepts API requests and routes them through a configurable transform pipeline before reaching the provider. Understanding the Headroom transform pipeline architecture is essential for developers who want to optimize token costs or add custom preprocessing logic. The pipeline is implemented as an ordered chain of `Transform` objects defined in [`headroom/transforms/base.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/base.py) and orchestrated by `TransformPipeline` in [`headroom/transforms/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/pipeline.py).

## Core Components of the Pipeline Architecture

The Headroom transform pipeline consists of five primary components, each handling a specific responsibility in the preprocessing chain:

- **`TransformPipeline`** – Orchestrates ordered execution of transforms and aggregates results. Located in [`headroom/transforms/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/pipeline.py).
- **`Transform` base class** – Defines the common interface (`apply`, `should_apply`, `name`) that all transforms must implement. Found in [`headroom/transforms/base.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/base.py).
- **`CacheAligner`** – Normalizes dynamic prefixes like dates and UUIDs in system prompts to improve provider cache hit rates. Source: [`headroom/transforms/cache_aligner.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/cache_aligner.py).
- **`ContentRouter`** – Inspects message content types and dispatches them to specialized compressors (SmartCrusher, CodeCompressor, etc.). Source: [`headroom/transforms/content_router.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/content_router.py).
- **`SmartCrusher`** – A Rust-backed statistical compressor for JSON arrays with CCR (Compress-Cache-Retrieve) integration. Source: [`headroom/transforms/smart_crusher.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/smart_crusher.py).

## Pipeline Orchestration and Execution Flow

The `TransformPipeline.apply` method in [`headroom/transforms/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/pipeline.py) executes transforms through a deterministic five-step process:

1. **Token counting** – The original message list is tokenized using a model-specific `Tokenizer`.
2. **Deep copy** – A mutable copy of the messages is created via `deep_copy_messages` to prevent side effects.
3. **Transform iteration** – The pipeline iterates over configured transforms in order. For each transform:
   - `should_apply` evaluates whether the transform should run for the current payload.
   - `apply` receives the current messages and `Tokenizer`, returning a `TransformResult` containing mutated messages, token counts, and applied transform identifiers.
4. **Aggregation** – The pipeline aggregates `transforms_applied`, `markers_inserted`, `warnings`, and timing information.
5. **Final recount** – After the last transform, a final full token recount produces the definitive `TransformResult`.

OpenTelemetry spans optionally record model, provider, token deltas, and per-transform duration for observability.

```python
from headroom.transforms.pipeline import TransformPipeline

pipeline = TransformPipeline()
result = pipeline.apply(messages, model="gpt-4o", model_limit=128_000)

print(result.transforms_applied)   # e.g., ["cache_aligner", "smart:lossless"]

print(result.tokens_before, result.tokens_after)

```

## Default Transform Order and Logic

The default pipeline, constructed by `_build_default_transforms`, executes transforms in the following order:

1. **Tool-result interceptors** – Optional early-stage transforms that pre-shrink tool output before main compressors run.
2. **Cache Aligner** – Stabilizes dynamic fragments in system prompts to maximize cache efficiency.
3. **Content Router** – A content-aware dispatcher that selects specialized compressors based on message type:
   - JSON arrays → `SmartCrusher`
   - Plain text → `Kompress` (optional ML compressor) or passthrough
   - Code blocks → `CodeCompressor` (AST-aware)
   - Logs → `LogCompressor`
   - Search results → `SearchCompressor`
   - HTML → `HTMLExtractor`

Modern Headroom versions (post Phase-B PR-B1) mutate only message *content* rather than dropping entire messages from the list.

## The SmartCrusher: JSON Compression Engine

`SmartCrusher` serves as the default JSON compressor and operates as a thin Python wrapper around a Rust implementation (`headroom._core`). It detects statistical patterns including time-series spikes and clusters, keeps change-points, factors out constants, and optionally compacts arrays into CSV-style schema strings.

The component emits **CCR markers** (`<<ccr:HASH …>>`) when rows are dropped, storing original payloads in a short-lived in-process cache (`compression_store` in [`headroom/cache/compression_store.py`](https://github.com/chopratejas/headroom/blob/main/headroom/cache/compression_store.py)). Compression outcomes are recorded to the **TOIN** learning system ([`headroom/telemetry/toin.py`](https://github.com/chopratejas/headroom/blob/main/headroom/telemetry/toin.py)) to adapt future compression policies.

## How to Extend the Pipeline with Custom Transforms

Extending the Headroom transform pipeline requires subclassing `Transform` and implementing three core methods.

First, create a subclass that defines:
- `name` – A short string identifier used in logs and `transforms_applied` arrays.
- `should_apply(messages, tokenizer, **kwargs)` – Returns `True` only when the transform is relevant (e.g., for messages exceeding a token threshold).
- `apply(messages, tokenizer, **kwargs)` – Returns a `TransformResult` with mutated messages and accurate token counts.

```python

# my_transform.py

from headroom.transforms.base import Transform, TransformResult

class MyUpperCaseTransform(Transform):
    name = "uppercase"

    def should_apply(self, messages, tokenizer, **kwargs):
        return any(
            tokenizer.count_text(m.get("content", "")) > 100
            for m in messages if m.get("role") == "user"
        )

    def apply(self, messages, tokenizer, **kwargs):
        new_messages = []
        for m in messages:
            if isinstance(m.get("content"), str):
                m["content"] = m["content"].upper()
            new_messages.append(m)
        
        return TransformResult(
            messages=new_messages,
            tokens_before=tokenizer.count_messages(messages),
            tokens_after=tokenizer.count_messages(new_messages),
            transforms_applied=[self.name],
            markers_inserted=[],
            warnings=[],
        )

```

Second, inject the transform into the pipeline by appending it to the transform list. Order matters—transforms execute sequentially:

```python
from headroom.transforms.pipeline import TransformPipeline
from my_transform import MyUpperCaseTransform

pipeline = TransformPipeline(transforms=[
    *TransformPipeline()._build_default_transforms(),
    MyUpperCaseTransform(),  # Executes after defaults

])

result = pipeline.apply(messages, model="gpt-4o", model_limit=128_000)

```

Third, optionally expose configuration knobs in `HeadroomConfig` ([`headroom/config.py`](https://github.com/chopratejas/headroom/blob/main/headroom/config.py)) to enable or disable the transform via environment variables or config files.

## Hook-Based Customization Options

For tighter integration without modifying the pipeline order, Headroom provides a **hooks system** that runs at `preCompress` and `postCompress` stages. These hooks execute before and after the entire pipeline, allowing arbitrary logic or message mutation.

Advanced use cases can also wrap existing transforms with decorators that inspect pipeline state via `kwargs["last_transform"]` to implement conditional execution based on previous transform results.

## Summary

- **Headroom's architecture** centers on `TransformPipeline` in [`headroom/transforms/pipeline.py`](https://github.com/chopratejas/headroom/blob/main/headroom/transforms/pipeline.py), which orchestrates an ordered chain of `Transform` objects.
- **Default execution order** is: tool interceptors → `CacheAligner` → `ContentRouter` (which dispatches to specialized compressors like `SmartCrusher`).
- **Extension mechanism** requires subclassing `Transform`, implementing `name`, `should_apply`, and `apply`, then injecting the instance into the pipeline transforms list.
- **SmartCrusher** provides Rust-backed JSON compression with CCR markers for retrievable data and integrates with the TOIN learning system.
- **Hook system** offers `preCompress` and `postCompress` integration points for logic that must run outside the standard transform sequence.

## Frequently Asked Questions

### What is the difference between the Cache Aligner and Content Router transforms?

**`CacheAligner`** stabilizes dynamic prefixes (dates, UUIDs, random identifiers) in system prompts to ensure provider-side caches hit consistently, whereas **`ContentRouter`** inspects message content types and dispatches them to specialized compressors (JSON to `SmartCrusher`, code to `CodeCompressor`, etc.). The Cache Aligner runs earlier in the pipeline to normalize content before routing decisions occur.

### How do I disable a specific transform in the Headroom pipeline?

You can disable transforms via `HeadroomConfig` in [`headroom/config.py`](https://github.com/chopratejas/headroom/blob/main/headroom/config.py). Instantiate the configuration class, set the specific transform's enabled flag to `False` (e.g., `cfg.cache_aligner.enabled = False`), and pass the config to `TransformPipeline(config=cfg)`. Alternatively, construct a custom pipeline by calling `_build_default_transforms()`, filtering out unwanted transforms, and passing the filtered list to the `TransformPipeline` constructor.

### Can I insert a custom transform between existing default transforms?

Yes. When constructing the pipeline, expand `_build_default_transforms()` into a list, insert your custom transform at the desired index position, and pass the modified list to the `TransformPipeline` constructor. Since transforms execute sequentially and may depend on prior normalization (e.g., Cache Aligner should precede Content Router), ensure your insertion point respects these dependencies.

### What is the purpose of CCR markers in the SmartCrusher?

**CCR markers** (`<<ccr:HASH …>>`) are placeholders emitted by `SmartCrusher` when it removes rows from JSON arrays during compression. These markers reference the original payload stored in the short-lived `compression_store` ([`headroom/cache/compression_store.py`](https://github.com/chopratejas/headroom/blob/main/headroom/cache/compression_store.py)). If the LLM response requests the compressed data, Headroom can retrieve the original rows from the cache and inject them back into the conversation, ensuring no information is permanently lost during aggressive compression.