Skip to content

Streaming Responses

LLM-generated answers can take several seconds — sometimes tens of seconds for complex queries. Streaming delivers tokens to the user as they're generated, giving immediate feedback instead of a blank screen followed by a wall of text.

This chapter covers the full streaming pipeline: Gaia's SSE format, proxying through your FastAPI backend, and consuming the stream in the React frontend.


Why Streaming?

Text Only
Without streaming:                     With streaming:
┌──────────────────────┐               ┌──────────────────────┐
│ User asks question   │               │ User asks question   │
│ ...                  │               │ "The quarterly..."   │  ← 200ms
│ ...                  │               │ "The quarterly rev.."│  ← 400ms
│ ... (8 seconds)      │               │ "The quarterly rev..." │ ← 600ms
│ Full answer appears  │               │ ... tokens arrive    │
└──────────────────────┘               │ Full answer complete │  ← 8s
                                       └──────────────────────┘

Streaming reduces perceived latency from seconds to milliseconds. Users see the answer forming in real time, which feels significantly faster even though the total time is the same.


Gaia's SSE Format

The POST /ask/stream endpoint returns a Server-Sent Events (SSE) stream. Each event follows the SSE specification:

Text Only
event: message
data: {"responseString": "The quarterly "}

event: message
data: {"responseString": "revenue was "}

event: message
data: {"responseString": "$4.2 billion."}

event: message
data: {"queryUid": "abc-123", "conversationId": "conv-456", "documents": [...], "finishReason": "stop"}

Key details:

  • Each event has an event field (always message) and a data field containing JSON.
  • The responseString field contains the incremental text token — not the full response so far.
  • The final event includes queryUid, conversationId, documents, and finishReason.
  • Events are separated by blank lines.
  • Lines starting with : are comments (used as keepalives).

SSE Field Reference

Field Type Description
responseString string Incremental text token
queryUid string Unique query ID (final event only)
conversationId string Conversation ID (final event only)
documents array Source documents (final event only)
finishReason string "stop", "length", etc. (final event only)

Backend: Proxying SSE from Gaia

Your backend proxies the SSE stream from Gaia to the frontend using FastAPI's StreamingResponse. This keeps the API key on the server and lets you add logging, filtering, or transformation.

Using the SDK's ask_stream_iter

The SDK's ask_stream_iter() method yields individual StreamChunk objects. Wrap it in an async generator and return it as a StreamingResponse:

Python
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse

from gaia_sdk import GaiaClient
from gaia_sdk.streaming import StreamChunk

from backend.api.dependencies import get_gaia_client
from backend.models.api_models import AskRequest

router = APIRouter()


@router.post("/ask/stream", tags=["Ask"])
async def ask_stream(
    request: AskRequest,
    client: GaiaClient = Depends(get_gaia_client),
):
    """Proxy a streaming RAG query from Gaia to the frontend."""

    async def event_generator():
        try:
            async for chunk in client.ask_stream_iter(
                dataset_names=request.dataset_names,
                query=request.query,
                conversation_id=request.conversation_id,
            ):
                yield f"event: {chunk.event}\ndata: {chunk.data}\n\n"
        except Exception as exc:
            # Send error as a final SSE event so the frontend can handle it
            import json
            error_data = json.dumps({"error": str(exc)})
            yield f"event: error\ndata: {error_data}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

Disable response buffering

Reverse proxies (nginx, AWS ALB) often buffer responses. The X-Accel-Buffering: no header tells nginx to stream through. For other proxies, check their documentation for similar settings.

Using ask_stream for Accumulated Results

If you don't need to forward individual chunks — for example, you want to process the full answer before responding — use ask_stream() instead:

Python
@router.post("/ask/stream-complete", tags=["Ask"])
async def ask_stream_complete(
    request: AskRequest,
    client: GaiaClient = Depends(get_gaia_client),
):
    """Use streaming internally but return the full result."""
    result = await client.ask_stream(
        dataset_names=request.dataset_names,
        query=request.query,
    )
    return {
        "response": result.full_text,
        "queryUid": result.query_uid,
        "conversationId": result.conversation_id,
        "documents": result.documents,
        "finishReason": result.finish_reason,
    }

Frontend: Consuming the Stream

The fetch API provides a ReadableStream on the response body. This is the most flexible approach and works in all modern browsers.

TypeScript
// src/hooks/useStreaming.ts

export function useStreaming() {
  const {
    addUserMessage,
    addAssistantMessage,
    updateAssistantMessage,
    setAssistantComplete,
    setConversationId,
    setLoading,
    conversationId,
  } = useChatStore();

  const sendMessage = async (datasetNames: string[], query: string) => {
    addUserMessage(query);
    setLoading(true);

    const assistantId = `msg-${Date.now()}`;
    addAssistantMessage(assistantId, "");

    try {
      const response = await fetch("/api/v1/ask/stream", {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
          "X-Session-ID": getSessionId() ?? "",
        },
        body: JSON.stringify({ datasetNames, queryString: query, conversationId }),
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);
      if (!response.body) throw new Error("No response body");

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let accumulated = "";
      let buffer = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });

        // Split on double newlines (SSE event boundaries)
        const events = buffer.split("\n\n");
        buffer = events.pop() ?? "";

        for (const event of events) {
          for (const line of event.split("\n")) {
            if (line.startsWith("data: ")) {
              const data = line.slice(6);
              try {
                const parsed = JSON.parse(data);
                if (parsed.responseString) {
                  accumulated += parsed.responseString;
                  updateAssistantMessage(assistantId, accumulated);
                }
                if (parsed.conversationId) {
                  setConversationId(parsed.conversationId);
                }
                if (parsed.finishReason) {
                  setAssistantComplete(assistantId, parsed.queryUid, parsed.documents);
                }
                if (parsed.error) {
                  throw new Error(parsed.error);
                }
              } catch (e: any) {
                if (e.message !== data) {
                  // JSON parse succeeded but contained an error
                  throw e;
                }
                // Non-JSON data: treat as raw text
                accumulated += data;
                updateAssistantMessage(assistantId, accumulated);
              }
            }
          }
        }
      }

      setAssistantComplete(assistantId);
    } catch (error: any) {
      updateAssistantMessage(assistantId, `Error: ${error.message}`);
      setAssistantComplete(assistantId);
    } finally {
      setLoading(false);
    }
  };

  return { sendMessage };
}

Option 2: EventSource (GET only)

EventSource is the browser's native SSE client, but it only supports GET requests. Since Gaia's streaming endpoint is POST, you'd need a GET wrapper on your backend:

Python
@router.get("/ask/stream/{conversation_id}", tags=["Ask"])
async def ask_stream_get(
    conversation_id: str,
    q: str,
    datasets: str,  # comma-separated
    client: GaiaClient = Depends(get_gaia_client),
):
    dataset_list = [d.strip() for d in datasets.split(",")]
    # ... same streaming logic ...
TypeScript
const source = new EventSource(
  `/api/v1/ask/stream/${convId}?q=${encodeURIComponent(query)}&datasets=${datasets.join(",")}`
);

source.onmessage = (event) => {
  const parsed = JSON.parse(event.data);
  // handle tokens...
};

source.onerror = () => {
  source.close();
};

When to use EventSource

EventSource is simpler but limited to GET requests and doesn't support custom headers. Use fetch + ReadableStream for POST-based endpoints and when you need the X-Session-ID header.


SDK Streaming Methods Compared

Method Returns Use Case
ask_stream() StreamResult Need the full result; streaming is an internal optimization
ask_stream_iter() AsyncIterator[StreamChunk] Need to forward individual chunks to the frontend

ask_stream() — Accumulated Result

Python
result = await client.ask_stream(["my-dataset"], "What happened?")
# result.full_text = "The complete answer..."
# result.documents = [...]
# result.query_uid = "abc-123"

ask_stream_iter() — Chunk by Chunk

Python
async for chunk in client.ask_stream_iter(["my-dataset"], "What happened?"):
    # chunk.event = "message"
    # chunk.data = '{"responseString": "The "}'
    # chunk.parsed = {"responseString": "The "}  (or None if not JSON)
    pass

Handling Partial JSON and Connection Drops

Partial JSON

SSE data lines can occasionally be split across TCP frames. The SDK handles this by buffering lines until a complete event (terminated by a blank line) is received. On the frontend, the buffer-and-split pattern in useStreaming achieves the same:

TypeScript
// Buffer incomplete events
buffer += decoder.decode(value, { stream: true });
const events = buffer.split("\n\n");
buffer = events.pop() ?? "";  // last element may be incomplete

Connection Drops

If the connection drops mid-stream:

The ask_stream_iter() generator will raise an httpx exception when the underlying connection closes. Wrap the generator in a try/except to handle this gracefully:

Python
async def event_generator():
    try:
        async for chunk in client.ask_stream_iter(...):
            yield f"event: {chunk.event}\ndata: {chunk.data}\n\n"
    except httpx.RemoteProtocolError:
        logger.warning("Connection dropped during stream")
    except Exception as exc:
        error_data = json.dumps({"error": str(exc)})
        yield f"event: error\ndata: {error_data}\n\n"

The reader.read() call resolves with done: true if the connection closes cleanly, or throws if it drops unexpectedly:

TypeScript
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    // process chunks...
  }
} catch (error) {
  // Connection dropped — show partial result + error
  updateAssistantMessage(assistantId, accumulated + "\n\n[Connection lost]");
}

Testing Streaming Endpoints

curl

Bash
curl -N -X POST http://localhost:8000/api/v1/ask/stream \
  -H "Content-Type: application/json" \
  -H "X-Session-ID: your-session-id" \
  -d '{"datasetNames": ["my-dataset"], "queryString": "Summarize the Q4 report"}'

The -N flag disables output buffering so you see events as they arrive.

Python script

Python
import httpx

async def test_stream():
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/api/v1/ask/stream",
            json={"datasetNames": ["my-dataset"], "queryString": "Summarize Q4"},
            headers={"X-Session-ID": "your-session-id"},
        ) as response:
            async for line in response.aiter_lines():
                print(line)

Next Steps