# Implementing Server-Sent Events Streaming for Real-Time Agent Output in Flue

> Implement Server-Sent Events streaming in Flue for real-time LLM output monitoring. Stream agent events and results over HTTP, eliminating the need for polling and enhancing your application's responsiveness.

- Repository: [Astro/flue](https://github.com/withastro/flue)
- Tags: tutorial
- Published: 2026-05-11

---

**Flue's runtime supports Server-Sent Events (SSE) to stream intermediate agent events and final results over HTTP, enabling real-time LLM output monitoring without polling.**

The withastro/flue framework delivers agent execution updates through Server-Sent Events (SSE), allowing clients to receive real-time streaming data as agents process tasks. This implementation eliminates the need for polling by maintaining an open HTTP connection that pushes events from the server the moment they occur.

## Detecting SSE Mode from Request Headers

When a POST request hits `/<agent-name>/<id>`, the framework examines request headers in `handleAgentRequest` to determine the response mode. According to the source code in [`packages/sdk/src/runtime/handle-agent.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/runtime/handle-agent.ts), the logic inspects the `Accept` header and `X-Webhook` flag:

```ts
// packages/sdk/src/runtime/handle-agent.ts#L96-L104
const accept = request.headers.get('accept') || '';
const isWebhook = request.headers.get('x-webhook') === 'true';
const isSSE = accept.includes('text/event-stream') && !isWebhook;

if (isWebhook)          → runWebhookMode(...)
else if (isSSE)          → runSseMode(...)
else                     → runSyncMode(...)

```

The **SSE mode** triggers only when the client sends `Accept: text/event-stream` and omits the `X-Webhook: true` header. This distinction allows the same endpoint to support synchronous, webhook, and streaming responses depending on client needs.

## Building the SSE Response Stream

The `runSseMode` function in [`packages/sdk/src/runtime/handle-agent.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/runtime/handle-agent.ts) creates a `TransformStream` whose readable side becomes the HTTP response body. The writable side receives SSE-formatted chunks:

```ts
// packages/sdk/src/runtime/handle-agent.ts#L65-L78
const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
const encoder = new TextEncoder();

```

Two helper functions manage the stream:

- **`writeSSE(data, eventType)`** formats events with proper SSE syntax (`event: …`, `id: …`, `data: …`) and writes them to the stream.
- **`writeHeartbeat()`** emits comment lines (`: heartbeat`) to keep the connection alive.

Both helpers guard against client-closed errors, silently ignoring write failures so the agent can complete its execution.

## Maintaining Connections with Heartbeats

To prevent intermediate proxies or browsers from closing idle connections, Flue sends periodic heartbeats every 15 seconds. The heartbeat interval is defined as a constant in the handler:

```ts
// packages/sdk/src/runtime/handle-agent.ts#L11-L14
export const SSE_HEARTBEAT_MS = 15_000;
const heartbeat = setInterval(() => writeHeartbeat().catch(() => {}), SSE_HEARTBEAT_MS);

```

When the agent finishes or throws an error, the runtime clears this interval, removes the event callback, closes the writer, and emits the final `run_end` event.

## Managing the Agent Event Lifecycle

After creating a `TransformStream`, `runSseMode` instantiates a run lifecycle and registers an event callback on the agent context. The context methods `setEventCallback` and `emitEvent` are defined in [`packages/sdk/src/context.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/context.ts), connecting agent logic to the runtime stream:

```ts
// packages/sdk/src/runtime/handle-agent.ts#L26-L31
ctx.setEventCallback((event) => {
  if (event.type === 'idle') isIdle = true;
  writeSSE(event, event.type).catch(() => {});
});

```

Every time the agent calls `ctx.emitEvent()`, the callback streams the event via `writeSSE`. The runtime forces an `idle` event before the terminal `run_end` to guarantee a clean end-of-stream. The final `run_end` event contains either the `result` or `error` payload, automatically emitted by the run-lifecycle code.

## Encoding SSE Events

Low-level SSE text encoding resides in [`packages/sdk/src/runtime/handle-run-routes.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/runtime/handle-run-routes.ts). The `encodeSseEvent` function formats events for the wire, using the same structure for live streams and stored event replays:

```ts
// packages/sdk/src/runtime/handle-run-routes.ts#L7-L13
function encodeSseEvent(event: FlueEvent): string {
  const id = typeof event.eventIndex === 'number' ? event.eventIndex : 0;
  return [
    `event: ${event.type}`,
    `id: ${id}`,
    `data: ${JSON.stringify(event)}`,
    '',
    '',
  ].join('\n');
}

```

This format ensures compatibility with standard SSE consumers, including the Flue CLI and browser-based EventSource clients.

## Consuming the Stream from the Client

Clients must initiate requests with the `Accept: text/event-stream` header to receive the SSE stream. The Flue CLI (`flue run`) implements this in [`packages/cli/bin/flue.ts`](https://github.com/withastro/flue/blob/main/packages/cli/bin/flue.ts) through the `consumeSSE` function:

```ts
// packages/cli/bin/flue.ts#L34-L42
const res = await fetch(url, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    Accept: 'text/event-stream',
  },
  body: payload,
  signal,
});

```

The consumer reads the response body chunk-by-chunk, splits on double-newlines to isolate individual SSE events, and parses the `data:` payload as JSON. It reacts to `run_end` for the final result or `error` for in-stream failures.

## Complete Client Implementation Example

The following JavaScript example demonstrates how to consume the SSE stream from a custom client:

```js
import fetch from 'node-fetch';

async function runAgent(name, id, payload) {
  const url = `http://localhost:3583/agents/${name}/${id}`;
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      Accept: 'text/event-stream',
    },
    body: JSON.stringify(payload),
  });

  if (!res.ok) throw new Error(`HTTP ${res.status}`);

  const decoder = new TextDecoder();
  let buffer = '';

  for await (const chunk of res.body) {
    buffer += decoder.decode(chunk, { stream: true });
    const parts = buffer.split('\n\n');
    buffer = parts.pop() ?? '';

    for (const part of parts) {
      if (!part.trim()) continue;
      const dataLines = part
        .split('\n')
        .filter(l => l.startsWith('data:'))
        .map(l => l.replace(/^data:\s?/, ''));

      const event = JSON.parse(dataLines.join('\n'));

      if (event.type === 'run_end') {
        console.log('✅ Run finished:', event.result);
        return;
      }

      // Render live events (LLM tokens, tool calls, etc.)
      console.log(`[${event.type}]`, event);
    }
  }
}

```

This implementation buffers partial chunks, splits on SSE message boundaries, and parses the JSON payload to handle real-time updates.

## Defining an Agent that Emits Events

To stream data, agents must emit events using the context API. Here is an example agent that pushes incremental updates:

```ts
// agents/echo.ts
import { init } from '@flue/sdk';

export default init({
  async handler(ctx) {
    ctx.emitEvent({ type: 'message', text: 'Starting…' });
    for (let i = 0; i < 5; i++) {
      await new Promise(r => setTimeout(r, 500));
      ctx.emitEvent({ type: 'tick', count: i + 1 });
    }
    ctx.emitEvent({ type: 'message', text: 'Done.' });
    return { ok: true };
  },
});

```

When executed via `flue run echo --target node --id demo --payload '{}'`, the CLI displays each `tick` event in real time through the SSE pipeline.

## Using the Flue CLI for SSE Streaming

No code changes are required to enable streaming when using the Flue CLI. Simply execute:

```bash
flue run my-agent --target node --id test-1 --payload '{"name":"Alice"}'

```

The CLI automatically adds `Accept: text/event-stream` to the request headers and streams events to the terminal, providing immediate visibility into agent execution.

## Summary

- **Request detection** in [`packages/sdk/src/runtime/handle-agent.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/runtime/handle-agent.ts) selects SSE mode when the `Accept` header contains `text/event-stream` and `X-Webhook` is absent.
- **Stream construction** uses a `TransformStream` with `writeSSE` and `writeHeartbeat` helpers to format and send events.
- **Heartbeat mechanism** sends `: heartbeat` comments every 15 seconds to keep connections open through proxies.
- **Event lifecycle** connects `ctx.emitEvent()` to the SSE stream via `setEventCallback`, ensuring all agent events reach the client.
- **Client consumption** requires the `Accept: text/event-stream` header and parses double-newline delimited messages to reconstruct events.
- **CLI support** provides built-in SSE consumption via `flue run`, automatically handling stream parsing and display.

## Frequently Asked Questions

### How does Flue handle connection drops during SSE streaming?

The runtime guards all stream writes with `.catch(() => {})` to silently ignore client-closed errors, allowing the agent to complete its execution even if the client disconnects. However, once the connection closes, the client must initiate a new request to receive further updates, as SSE does not support automatic reconnection at the protocol level beyond the browser's default retry behavior.

### What is the difference between webhook mode and SSE mode in Flue?

**Webhook mode** requires the `X-Webhook: true` header and sends the complete result to a configured callback URL after execution finishes. **SSE mode** (triggered by `Accept: text/event-stream`) maintains an open HTTP connection and streams intermediate events in real time, delivering the final result as a `run_end` event before closing the stream.

### Can I replay stored runs using the same SSE format?

Yes. The `encodeSseEvent` function in [`packages/sdk/src/runtime/handle-run-routes.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/runtime/handle-run-routes.ts) formats events consistently for both live streams and stored replays. When retrieving historical run data from [`packages/sdk/src/runtime/run-store.ts`](https://github.com/withastro/flue/blob/main/packages/sdk/src/runtime/run-store.ts), the system uses the same encoding logic to ensure clients receive identically formatted events regardless of whether they are listening live or replaying a persisted execution.

### What types of events can agents emit during SSE streaming?

Agents can emit any custom event type via `ctx.emitEvent()`, but the runtime automatically handles several standard types including `message`, `idle`, and `run_end`. The `idle` event signals that the agent has finished processing but the stream remains open for the final `run_end` event, which contains the execution result or error details.