How to Use Event Callbacks for Monitoring and Debugging Agent Execution in Flue

Flue exposes two distinct callback mechanisms—subscribeEvent() for persistent multi-subscriber monitoring and setEventCallback() for temporary debugging—that capture every FlueEvent emitted during an agent's lifecycle.

Flue's runtime architecture generates a comprehensive stream of events describing the complete execution path of AI agents, from HTTP request to final result. According to the withastro/flue source code, the core FlueContext class in packages/sdk/src/client.ts produces these FlueEvent objects to provide deep observability into LLM calls, tool executions, and state mutations.

Flue's Two Event Callback Mechanisms

Flue provides distinct APIs for observing agent execution. Your choice depends on whether you need long-lived monitoring or temporary debugging capabilities.

subscribeEvent(): Production Monitoring

The subscribeEvent(callback) method registers a callback that receives every event emitted while the context lives. This mechanism is designed for production monitoring where multiple subscribers must coexist without interference.

  • Async-safe execution: Subscribers are invoked via Promise.resolve(), ensuring a failure in one never blocks others
  • Multiple subscribers: Register unlimited concurrent observers for different monitoring purposes
  • Non-blocking I/O: Safe for database writes, webhook calls, or SSE streaming

setEventCallback(): Debugging Override

The setEventCallback(callback) method replaces the default event handler that Flue uses internally (typically the built-in logger). This is a single-handler replacement, not an additive subscription.

  • Single handler: Replaces the previous callback entirely via internal handlerUnsubscribe?.()
  • Restorable: Call setEventCallback(undefined) to restore the default logger
  • Mutation capable: Transform or inject custom fields before events reach logging systems

Where Events Are Generated in the Source Code

Understanding the event lifecycle requires tracing three critical locations in the Flue SDK.

FlueContextInternal.emitEvent

The core emission logic lives in packages/sdk/src/client.ts (lines 55-71). The emitEvent function decorates raw events with metadata before broadcasting to all subscribers:

const emitEvent = (event: FlueEvent): FlueEvent => {
    const decorated: FlueEvent = { ...event, runId: config.runId,
      eventIndex: eventIndex++, timestamp: new Date().toISOString() };
    for (const subscriber of subscribers) {
        try {
            Promise.resolve(subscriber(decorated)).catch(...);
        } catch (error) { ... }
    }
    return decorated;
};

Each event receives a runId, monotonically increasing eventIndex, and ISO timestamp before distribution to all registered subscribers.

Harness.decorateEventCallback

In packages/sdk/src/harness.ts (lines 183-188), the harness wraps user-supplied callbacks to ensure the harness name is present on every event:

private decorateEventCallback(callback: FlueEventCallback | undefined): FlueEventCallback | undefined {
    return callback
        ? (event) => { callback({ ...event, harness: event.harness ?? this.name }); }
        : undefined;
}

This decoration guarantees traceability when multiple harnesses execute within a single request, ensuring you can always identify which harness produced the event.

handleAgentRequest Entry Point

The HTTP handler in packages/sdk/src/runtime/handle-agent.ts (line 16) instantiates the FlueContext and routes requests to your agent handler. All events emitted within the handler—including those from ctx.log.info or ctx.emitEvent—flow back through the subscriber system established during context creation.

Code Examples for Monitoring and Debugging

Real-Time Monitoring with Server-Sent Events

Use subscribeEvent() to stream live agent execution data to clients:

import { createFlueContext } from '@flue/sdk';

export async function handler(req: Request) {
  const ctx = createFlueContext({
    id: 'run-123',
    runId: 'run-123',
    payload: await req.json(),
    env: process.env,
    agentConfig: {/* … */},
    createDefaultEnv: async () => {/* … */},
    createLocalEnv: async () => {/* … */},
    defaultStore: new Map(),
  });

  // Stream events as SSE to connected clients
  ctx.subscribeEvent((ev) => {
    const sse = `event: ${ev.type}\ndata: ${JSON.stringify(ev)}\n\n`;
    sseStream.write(sse);
  });

  const harness = await ctx.init({ model: false, name: 'monitoring' });
  const session = await harness.session();
  await session.prompt('Analyze this dataset');
}

Debugging with setEventCallback

Replace the default logger for detailed step-by-step inspection during development:

import { createFlueContext } from '@flue/sdk';

async function debugRun() {
  const ctx = createFlueContext({
    id: 'debug-run',
    runId: 'debug-run',
    payload: {},
    env: {},
    agentConfig: {/* … */},
    createDefaultEnv: async () => {/* … */},
    createLocalEnv: async () => {/* … */},
    defaultStore: new Map(),
  });

  ctx.setEventCallback((ev) => {
    console.debug('[AGENT DEBUG]', ev.type, ev);
    // Return event to maintain chain if other processing exists
    return ev;
  });

  const harness = await ctx.init({ model: false });
  const session = await harness.session();
  await session.prompt('Debug this execution');
  
  // Restore default logging when finished
  ctx.setEventCallback(undefined);
}

Decorating Events with Custom Context

Inject correlation IDs or additional metadata for distributed tracing:

import { init } from '@flue/sdk';

async function contextualRun() {
  const ctx = await init({
    id: 'my-run',
    runId: 'my-run',
    payload: {},
    env: process.env,
    agentConfig: {/* … */},
    createDefaultEnv: async () => {/* … */},
    createLocalEnv: async () => {/* … */},
    defaultStore: new Map(),
  });

  ctx.setEventCallback((ev) => {
    return { ...ev, correlationId: 'abc-123', environment: 'staging' };
  });

  // All subsequent events include the correlationId
  const harness = await ctx.init({ model: false });
  // … execute agent logic …
}

Key Source Files Reference

Summary

  • subscribeEvent() enables persistent, multi-subscriber monitoring suitable for production dashboards and real-time SSE streaming without blocking agent execution
  • setEventCallback() provides a temporary, single-handler override for debugging sessions that can be restored to defaults by passing undefined
  • Events are decorated with runId, eventIndex, and timestamps in packages/sdk/src/client.ts before distribution to all subscribers
  • The harness automatically injects harness names into events via decorateEventCallback in packages/sdk/src/harness.ts to ensure traceability
  • Both mechanisms support async I/O operations via Promise.resolve() wrappers that prevent subscriber failures from affecting other observers or the main agent flow

Frequently Asked Questions

What is the difference between subscribeEvent and setEventCallback in Flue?

subscribeEvent() adds your callback to a list of persistent subscribers that all receive every event, while setEventCallback() replaces the single internal handler (usually the logger) used by Flue. Use subscribeEvent for production monitoring where multiple observers must coexist, and setEventCallback for temporary debugging when you need to intercept or modify events before logging.

Can I use multiple event callbacks simultaneously in Flue?

Yes, but only through subscribeEvent(), which supports unlimited concurrent subscribers. Each subscriber executes independently via Promise.resolve(), ensuring that errors in one callback do not affect others. setEventCallback() restricts you to one active handler at a time and removes the previous handler during registration.

How do I correlate events with specific agent runs when monitoring Flue agents?

Every FlueEvent automatically receives a runId and monotonically increasing eventIndex via the emitEvent function in packages/sdk/src/client.ts. Additionally, the harness decorator in packages/sdk/src/harness.ts injects a harness identifier. Combine these fields—runId, eventIndex, and harness—to trace the complete execution path of any specific agent run.

Is it safe to perform database writes or HTTP requests inside an event callback?

Yes. Flue wraps subscriber invocations in Promise.resolve() and handles rejections gracefully inside emitEvent, making it safe to perform asynchronous I/O operations like database writes, webhook calls, or SSE streaming inside your subscribeEvent callbacks without blocking the agent's execution flow.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →