Implementing Server-Sent Events Streaming for Real-Time Agent Output in Flue

Flue's runtime supports Server-Sent Events (SSE) to stream intermediate agent events and final results over HTTP, enabling real-time LLM output monitoring without polling.

The withastro/flue framework delivers agent execution updates through Server-Sent Events (SSE), allowing clients to receive real-time streaming data as agents process tasks. This implementation eliminates the need for polling by maintaining an open HTTP connection that pushes events from the server the moment they occur.

Detecting SSE Mode from Request Headers

When a POST request hits /<agent-name>/<id>, the framework examines request headers in handleAgentRequest to determine the response mode. According to the source code in packages/sdk/src/runtime/handle-agent.ts, the logic inspects the Accept header and X-Webhook flag:

// packages/sdk/src/runtime/handle-agent.ts#L96-L104
const accept = request.headers.get('accept') || '';
const isWebhook = request.headers.get('x-webhook') === 'true';
const isSSE = accept.includes('text/event-stream') && !isWebhook;

if (isWebhook)          → runWebhookMode(...)
else if (isSSE)          → runSseMode(...)
elserunSyncMode(...)

The SSE mode triggers only when the client sends Accept: text/event-stream and omits the X-Webhook: true header. This distinction allows the same endpoint to support synchronous, webhook, and streaming responses depending on client needs.

Building the SSE Response Stream

The runSseMode function in packages/sdk/src/runtime/handle-agent.ts creates a TransformStream whose readable side becomes the HTTP response body. The writable side receives SSE-formatted chunks:

// packages/sdk/src/runtime/handle-agent.ts#L65-L78
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
const encoder = new TextEncoder();

Two helper functions manage the stream:

  • writeSSE(data, eventType) formats events with proper SSE syntax (event: …, id: …, data: …) and writes them to the stream.
  • writeHeartbeat() emits comment lines (: heartbeat) to keep the connection alive.

Both helpers guard against client-closed errors, silently ignoring write failures so the agent can complete its execution.

Maintaining Connections with Heartbeats

To prevent intermediate proxies or browsers from closing idle connections, Flue sends periodic heartbeats every 15 seconds. The heartbeat interval is defined as a constant in the handler:

// packages/sdk/src/runtime/handle-agent.ts#L11-L14
export const SSE_HEARTBEAT_MS = 15_000;
const heartbeat = setInterval(() => writeHeartbeat().catch(() => {}), SSE_HEARTBEAT_MS);

When the agent finishes or throws an error, the runtime clears this interval, removes the event callback, closes the writer, and emits the final run_end event.

Managing the Agent Event Lifecycle

After creating a TransformStream, runSseMode instantiates a run lifecycle and registers an event callback on the agent context. The context methods setEventCallback and emitEvent are defined in packages/sdk/src/context.ts, connecting agent logic to the runtime stream:

// packages/sdk/src/runtime/handle-agent.ts#L26-L31
ctx.setEventCallback((event) => {
  if (event.type === 'idle') isIdle = true;
  writeSSE(event, event.type).catch(() => {});
});

Every time the agent calls ctx.emitEvent(), the callback streams the event via writeSSE. The runtime forces an idle event before the terminal run_end to guarantee a clean end-of-stream. The final run_end event contains either the result or error payload, automatically emitted by the run-lifecycle code.

Encoding SSE Events

Low-level SSE text encoding resides in packages/sdk/src/runtime/handle-run-routes.ts. The encodeSseEvent function formats events for the wire, using the same structure for live streams and stored event replays:

// packages/sdk/src/runtime/handle-run-routes.ts#L7-L13
function encodeSseEvent(event: FlueEvent): string {
  const id = typeof event.eventIndex === 'number' ? event.eventIndex : 0;
  return [
    `event: ${event.type}`,
    `id: ${id}`,
    `data: ${JSON.stringify(event)}`,
    '',
    '',
  ].join('\n');
}

This format ensures compatibility with standard SSE consumers, including the Flue CLI and browser-based EventSource clients.

Consuming the Stream from the Client

Clients must initiate requests with the Accept: text/event-stream header to receive the SSE stream. The Flue CLI (flue run) implements this in packages/cli/bin/flue.ts through the consumeSSE function:

// packages/cli/bin/flue.ts#L34-L42
const res = await fetch(url, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    Accept: 'text/event-stream',
  },
  body: payload,
  signal,
});

The consumer reads the response body chunk-by-chunk, splits on double-newlines to isolate individual SSE events, and parses the data: payload as JSON. It reacts to run_end for the final result or error for in-stream failures.

Complete Client Implementation Example

The following JavaScript example demonstrates how to consume the SSE stream from a custom client:

import fetch from 'node-fetch';

async function runAgent(name, id, payload) {
  const url = `http://localhost:3583/agents/${name}/${id}`;
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      Accept: 'text/event-stream',
    },
    body: JSON.stringify(payload),
  });

  if (!res.ok) throw new Error(`HTTP ${res.status}`);

  const decoder = new TextDecoder();
  let buffer = '';

  for await (const chunk of res.body) {
    buffer += decoder.decode(chunk, { stream: true });
    const parts = buffer.split('\n\n');
    buffer = parts.pop() ?? '';

    for (const part of parts) {
      if (!part.trim()) continue;
      const dataLines = part
        .split('\n')
        .filter(l => l.startsWith('data:'))
        .map(l => l.replace(/^data:\s?/, ''));

      const event = JSON.parse(dataLines.join('\n'));

      if (event.type === 'run_end') {
        console.log('✅ Run finished:', event.result);
        return;
      }

      // Render live events (LLM tokens, tool calls, etc.)
      console.log(`[${event.type}]`, event);
    }
  }
}

This implementation buffers partial chunks, splits on SSE message boundaries, and parses the JSON payload to handle real-time updates.

Defining an Agent that Emits Events

To stream data, agents must emit events using the context API. Here is an example agent that pushes incremental updates:

// agents/echo.ts
import { init } from '@flue/sdk';

export default init({
  async handler(ctx) {
    ctx.emitEvent({ type: 'message', text: 'Starting…' });
    for (let i = 0; i < 5; i++) {
      await new Promise(r => setTimeout(r, 500));
      ctx.emitEvent({ type: 'tick', count: i + 1 });
    }
    ctx.emitEvent({ type: 'message', text: 'Done.' });
    return { ok: true };
  },
});

When executed via flue run echo --target node --id demo --payload '{}', the CLI displays each tick event in real time through the SSE pipeline.

Using the Flue CLI for SSE Streaming

No code changes are required to enable streaming when using the Flue CLI. Simply execute:

flue run my-agent --target node --id test-1 --payload '{"name":"Alice"}'

The CLI automatically adds Accept: text/event-stream to the request headers and streams events to the terminal, providing immediate visibility into agent execution.

Summary

  • Request detection in packages/sdk/src/runtime/handle-agent.ts selects SSE mode when the Accept header contains text/event-stream and X-Webhook is absent.
  • Stream construction uses a TransformStream with writeSSE and writeHeartbeat helpers to format and send events.
  • Heartbeat mechanism sends : heartbeat comments every 15 seconds to keep connections open through proxies.
  • Event lifecycle connects ctx.emitEvent() to the SSE stream via setEventCallback, ensuring all agent events reach the client.
  • Client consumption requires the Accept: text/event-stream header and parses double-newline delimited messages to reconstruct events.
  • CLI support provides built-in SSE consumption via flue run, automatically handling stream parsing and display.

Frequently Asked Questions

How does Flue handle connection drops during SSE streaming?

The runtime guards all stream writes with .catch(() => {}) to silently ignore client-closed errors, allowing the agent to complete its execution even if the client disconnects. However, once the connection closes, the client must initiate a new request to receive further updates, as SSE does not support automatic reconnection at the protocol level beyond the browser's default retry behavior.

What is the difference between webhook mode and SSE mode in Flue?

Webhook mode requires the X-Webhook: true header and sends the complete result to a configured callback URL after execution finishes. SSE mode (triggered by Accept: text/event-stream) maintains an open HTTP connection and streams intermediate events in real time, delivering the final result as a run_end event before closing the stream.

Can I replay stored runs using the same SSE format?

Yes. The encodeSseEvent function in packages/sdk/src/runtime/handle-run-routes.ts formats events consistently for both live streams and stored replays. When retrieving historical run data from packages/sdk/src/runtime/run-store.ts, the system uses the same encoding logic to ensure clients receive identically formatted events regardless of whether they are listening live or replaying a persisted execution.

What types of events can agents emit during SSE streaming?

Agents can emit any custom event type via ctx.emitEvent(), but the runtime automatically handles several standard types including message, idle, and run_end. The idle event signals that the agent has finished processing but the stream remains open for the final run_end event, which contains the execution result or error details.

Have a question about this repo?

These articles cover the highlights, but your codebase questions are specific. Give your agent direct access to the source. Share this with your agent to get started:

Share the following with your agent to get started:
curl -s "https://instagit.com/install.md"

Works with
Claude Codex Cursor VS Code OpenClaw Any MCP Client

Maintain an open-source project? Get it listed too →