How Headroom’s Pipeline Extension System Works with `onPipelineEvent`
Headroom’s pipeline extension system is a publish-subscribe lifecycle mechanism that exposes every compression stage through the onPipelineEvent callback, letting you inspect, log, or augment transforms without modifying core engine code.
Headroom—an open-source SDK in the chopratejas/headroom repository—processes each request through a sequence of discrete transforms managed by an internal engine. The Headroom pipeline extension system surfaces granular lifecycle events via the onPipelineEvent handler that you supply to HeadroomClient. Hooking into these events enables custom telemetry, real-time token monitoring, and dynamic bias injection.
Pipeline Architecture and Event Lifecycle
At the heart of the system is the engine located in plugins/openclaw/src/engine.ts. This module iterates over its built-in transforms—bias computation, token-budget enforcement, CCR lookup, and others—and invokes your callback before and after each stage. Because the handler may run synchronously or return a Promise, you can safely execute side effects such as external lookups or telemetry writes.
Event Types and Payloads
The engine emits four distinct events as plain objects:
| Event name | When it fires | Payload fields |
|---|---|---|
pipeline:start |
Right before the first transform runs | requestId, model, startTime |
pipeline:step |
After each individual transform (e.g., bias, cache, compression) | step, tokensBefore, tokensAfter, durationMs |
pipeline:complete |
When the whole pipeline finishes | tokensSaved, compressionRatio, transformsApplied |
pipeline:error |
If a transform throws and the pipeline aborts | error, step |
Each payload gives you stage-specific metadata to trace exactly how a request is mutated.
How onPipelineEvent Integrates with the SDK
The call chain crosses three primary TypeScript modules. Understanding this flow is critical to using the pipeline extension system effectively.
From compress() to the Engine
- The high-level
compress()function insdk/typescript/src/compress.tsconstructs aCompressContextand calls the client. - The
HeadroomClientinsdk/typescript/src/client.tsstores youronPipelineEventcallback and forwards the request to the engine. - The engine in
plugins/openclaw/src/engine.tsloops through its transforms and firesonPipelineEventwith the appropriate event name and data.
Aggregating Results with postCompress
After the engine returns, compress() triggers the postCompress hook defined in sdk/typescript/src/hooks.ts. This hook receives a CompressEvent that aggregates the metrics from the final pipeline:complete step. While onPipelineEvent delivers per-step granularity, postCompress provides a single, high-level summary.
Practical Examples
Logging Pipeline Stages via onPipelineEvent
Pass a handler directly to the client constructor to monitor every lifecycle stage:
import { HeadroomClient, compress } from "headroom-sdk";
function onPipelineEvent(event: string, data: any) {
switch (event) {
case "pipeline:start":
console.log(`🛠️ Pipeline started – request ${data.requestId}`);
break;
case "pipeline:step":
console.log(
`🔧 Step ${data.step} – ${data.tokensBefore}→${data.tokensAfter} tokens`
);
break;
case "pipeline:complete":
console.log(
`✅ Pipeline complete – saved ${data.tokensSaved} tokens (${(
data.compressionRatio * 100
).toFixed(1)}%)`
);
break;
case "pipeline:error":
console.error(`❗️ Error in step ${data.step}:`, data.error);
}
}
const client = new HeadroomClient({
apiKey: "YOUR_HEADROOM_KEY",
onPipelineEvent,
});
const messages = [
{ role: "user", content: "Explain micro-services." },
{ role: "assistant", content: "Micro-services are …" },
];
const result = await compress(messages, { client, model: "gpt-4o" });
console.log("📦 Final compressed payload:", result.messages);
Observing Results with the Hooks API
If you prefer a declarative interface, define a postCompress hook. The payload mirrors the data emitted by pipeline:complete:
import { compress, type CompressionHooks } from "headroom-sdk";
const myHooks: CompressionHooks = {
preCompress: async (msgs, ctx) => msgs,
computeBiases: async (openaiMsgs, ctx) => ({}),
postCompress: async (event) => {
console.log("🔔 post-compress hook:", event);
},
};
const messages = [
{ role: "user", content: "Explain micro-services." },
];
await compress(messages, { client, hooks: myHooks });
Key Source Files
The implementation of the Headroom pipeline extension system spans the following files:
sdk/typescript/src/compress.ts– Entry point that buildsCompressContextand fires thepostCompresshook.sdk/typescript/src/hooks.ts– DefinesCompressionHooks,CompressContext, andCompressEvent.sdk/typescript/src/client.ts– ImplementsHeadroomClient; stores and forwards theonPipelineEventcallback.plugins/openclaw/src/engine.ts– Core pipeline runner that iterates over transforms and emits lifecycle events.sdk/typescript/src/types/models.ts– Contains thepipelineTimingmetric populated frompipeline:stepevents.
Summary
- The Headroom pipeline extension system wraps the compression engine in a publish-subscribe lifecycle layer.
- You provide an
onPipelineEventhandler toHeadroomClientto receivepipeline:start,pipeline:step,pipeline:complete, andpipeline:errorevents. - The engine in
plugins/openclaw/src/engine.tsinvokes this handler at each stage, allowing inspection, logging, or data augmentation without core code changes. - The
compress()function insdk/typescript/src/compress.tsaggregates final metrics and exposes them through the optionalpostCompresshook. - Per-step latency data is tracked in
pipelineTimingas defined insdk/typescript/src/types/models.ts.
Frequently Asked Questions
What events does Headroom’s pipeline extension system emit?
The engine emits pipeline:start, pipeline:step, pipeline:complete, and pipeline:error. Each carries a plain-object payload with stage-specific metadata such as token counts, transform names, and timing data.
Can I modify pipeline data inside an onPipelineEvent handler?
Yes. The callback receives mutable stage data and may run synchronously or return a Promise. This lets you inject additional bias information, trigger external CCR look-ups, or append custom telemetry tags before the next transform executes.
How does onPipelineEvent differ from the postCompress hook?
onPipelineEvent fires multiple times per request—once per lifecycle stage—offering real-time, per-transform visibility. The postCompress hook fires exactly once after compress() returns and receives only the final CompressEvent aggregated from the last pipeline event.
Where is the pipeline engine implemented in the Headroom source code?
The pipeline engine that orchestrates transforms and invokes onPipelineEvent lives in plugins/openclaw/src/engine.ts. The client storing your callback is in sdk/typescript/src/client.ts, and the top-level orchestrator is sdk/typescript/src/compress.ts.
Have a question about this repo?
These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:
curl -s "https://instagit.com/install.md" Maintain an open-source project? Get it listed too →