Headroom Transform Pipeline Architecture: How It Works and How to Extend It
Headroom processes every LLM request through a deterministic, provider-agnostic transform pipeline that applies modular compression and normalization transforms—using the Transform base class and TransformPipeline orchestrator—to reduce token usage while preserving semantic information.
Headroom is an open-source LLM optimization library that intercepts API requests and routes them through a configurable transform pipeline before reaching the provider. Understanding the Headroom transform pipeline architecture is essential for developers who want to optimize token costs or add custom preprocessing logic. The pipeline is implemented as an ordered chain of Transform objects defined in headroom/transforms/base.py and orchestrated by TransformPipeline in headroom/transforms/pipeline.py.
Core Components of the Pipeline Architecture
The Headroom transform pipeline consists of five primary components, each handling a specific responsibility in the preprocessing chain:
TransformPipeline– Orchestrates ordered execution of transforms and aggregates results. Located inheadroom/transforms/pipeline.py.Transformbase class – Defines the common interface (apply,should_apply,name) that all transforms must implement. Found inheadroom/transforms/base.py.CacheAligner– Normalizes dynamic prefixes like dates and UUIDs in system prompts to improve provider cache hit rates. Source:headroom/transforms/cache_aligner.py.ContentRouter– Inspects message content types and dispatches them to specialized compressors (SmartCrusher, CodeCompressor, etc.). Source:headroom/transforms/content_router.py.SmartCrusher– A Rust-backed statistical compressor for JSON arrays with CCR (Compress-Cache-Retrieve) integration. Source:headroom/transforms/smart_crusher.py.
Pipeline Orchestration and Execution Flow
The TransformPipeline.apply method in headroom/transforms/pipeline.py executes transforms through a deterministic five-step process:
- Token counting – The original message list is tokenized using a model-specific
Tokenizer. - Deep copy – A mutable copy of the messages is created via
deep_copy_messagesto prevent side effects. - Transform iteration – The pipeline iterates over configured transforms in order. For each transform:
should_applyevaluates whether the transform should run for the current payload.applyreceives the current messages andTokenizer, returning aTransformResultcontaining mutated messages, token counts, and applied transform identifiers.
- Aggregation – The pipeline aggregates
transforms_applied,markers_inserted,warnings, and timing information. - Final recount – After the last transform, a final full token recount produces the definitive
TransformResult.
OpenTelemetry spans optionally record model, provider, token deltas, and per-transform duration for observability.
from headroom.transforms.pipeline import TransformPipeline
pipeline = TransformPipeline()
result = pipeline.apply(messages, model="gpt-4o", model_limit=128_000)
print(result.transforms_applied) # e.g., ["cache_aligner", "smart:lossless"]
print(result.tokens_before, result.tokens_after)
Default Transform Order and Logic
The default pipeline, constructed by _build_default_transforms, executes transforms in the following order:
- Tool-result interceptors – Optional early-stage transforms that pre-shrink tool output before main compressors run.
- Cache Aligner – Stabilizes dynamic fragments in system prompts to maximize cache efficiency.
- Content Router – A content-aware dispatcher that selects specialized compressors based on message type:
- JSON arrays →
SmartCrusher - Plain text →
Kompress(optional ML compressor) or passthrough - Code blocks →
CodeCompressor(AST-aware) - Logs →
LogCompressor - Search results →
SearchCompressor - HTML →
HTMLExtractor
- JSON arrays →
Modern Headroom versions (post Phase-B PR-B1) mutate only message content rather than dropping entire messages from the list.
The SmartCrusher: JSON Compression Engine
SmartCrusher serves as the default JSON compressor and operates as a thin Python wrapper around a Rust implementation (headroom._core). It detects statistical patterns including time-series spikes and clusters, keeps change-points, factors out constants, and optionally compacts arrays into CSV-style schema strings.
The component emits CCR markers (<<ccr:HASH …>>) when rows are dropped, storing original payloads in a short-lived in-process cache (compression_store in headroom/cache/compression_store.py). Compression outcomes are recorded to the TOIN learning system (headroom/telemetry/toin.py) to adapt future compression policies.
How to Extend the Pipeline with Custom Transforms
Extending the Headroom transform pipeline requires subclassing Transform and implementing three core methods.
First, create a subclass that defines:
name– A short string identifier used in logs andtransforms_appliedarrays.should_apply(messages, tokenizer, **kwargs)– ReturnsTrueonly when the transform is relevant (e.g., for messages exceeding a token threshold).apply(messages, tokenizer, **kwargs)– Returns aTransformResultwith mutated messages and accurate token counts.
# my_transform.py
from headroom.transforms.base import Transform, TransformResult
class MyUpperCaseTransform(Transform):
name = "uppercase"
def should_apply(self, messages, tokenizer, **kwargs):
return any(
tokenizer.count_text(m.get("content", "")) > 100
for m in messages if m.get("role") == "user"
)
def apply(self, messages, tokenizer, **kwargs):
new_messages = []
for m in messages:
if isinstance(m.get("content"), str):
m["content"] = m["content"].upper()
new_messages.append(m)
return TransformResult(
messages=new_messages,
tokens_before=tokenizer.count_messages(messages),
tokens_after=tokenizer.count_messages(new_messages),
transforms_applied=[self.name],
markers_inserted=[],
warnings=[],
)
Second, inject the transform into the pipeline by appending it to the transform list. Order matters—transforms execute sequentially:
from headroom.transforms.pipeline import TransformPipeline
from my_transform import MyUpperCaseTransform
pipeline = TransformPipeline(transforms=[
*TransformPipeline()._build_default_transforms(),
MyUpperCaseTransform(), # Executes after defaults
])
result = pipeline.apply(messages, model="gpt-4o", model_limit=128_000)
Third, optionally expose configuration knobs in HeadroomConfig (headroom/config.py) to enable or disable the transform via environment variables or config files.
Hook-Based Customization Options
For tighter integration without modifying the pipeline order, Headroom provides a hooks system that runs at preCompress and postCompress stages. These hooks execute before and after the entire pipeline, allowing arbitrary logic or message mutation.
Advanced use cases can also wrap existing transforms with decorators that inspect pipeline state via kwargs["last_transform"] to implement conditional execution based on previous transform results.
Summary
- Headroom's architecture centers on
TransformPipelineinheadroom/transforms/pipeline.py, which orchestrates an ordered chain ofTransformobjects. - Default execution order is: tool interceptors →
CacheAligner→ContentRouter(which dispatches to specialized compressors likeSmartCrusher). - Extension mechanism requires subclassing
Transform, implementingname,should_apply, andapply, then injecting the instance into the pipeline transforms list. - SmartCrusher provides Rust-backed JSON compression with CCR markers for retrievable data and integrates with the TOIN learning system.
- Hook system offers
preCompressandpostCompressintegration points for logic that must run outside the standard transform sequence.
Frequently Asked Questions
What is the difference between the Cache Aligner and Content Router transforms?
CacheAligner stabilizes dynamic prefixes (dates, UUIDs, random identifiers) in system prompts to ensure provider-side caches hit consistently, whereas ContentRouter inspects message content types and dispatches them to specialized compressors (JSON to SmartCrusher, code to CodeCompressor, etc.). The Cache Aligner runs earlier in the pipeline to normalize content before routing decisions occur.
How do I disable a specific transform in the Headroom pipeline?
You can disable transforms via HeadroomConfig in headroom/config.py. Instantiate the configuration class, set the specific transform's enabled flag to False (e.g., cfg.cache_aligner.enabled = False), and pass the config to TransformPipeline(config=cfg). Alternatively, construct a custom pipeline by calling _build_default_transforms(), filtering out unwanted transforms, and passing the filtered list to the TransformPipeline constructor.
Can I insert a custom transform between existing default transforms?
Yes. When constructing the pipeline, expand _build_default_transforms() into a list, insert your custom transform at the desired index position, and pass the modified list to the TransformPipeline constructor. Since transforms execute sequentially and may depend on prior normalization (e.g., Cache Aligner should precede Content Router), ensure your insertion point respects these dependencies.
What is the purpose of CCR markers in the SmartCrusher?
CCR markers (<<ccr:HASH …>>) are placeholders emitted by SmartCrusher when it removes rows from JSON arrays during compression. These markers reference the original payload stored in the short-lived compression_store (headroom/cache/compression_store.py). If the LLM response requests the compressed data, Headroom can retrieve the original rows from the cache and inject them back into the conversation, ensuring no information is permanently lost during aggressive compression.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →