Streaming Responses¶
LLM-generated answers can take several seconds — sometimes tens of seconds for complex queries. Streaming delivers tokens to the user as they're generated, giving immediate feedback instead of a blank screen followed by a wall of text.
This chapter covers the full streaming pipeline: Gaia's SSE format, proxying through your FastAPI backend, and consuming the stream in the React frontend.
Why Streaming?¶
Without streaming: With streaming:
┌──────────────────────┐ ┌──────────────────────┐
│ User asks question │ │ User asks question │
│ ... │ │ "The quarterly..." │ ← 200ms
│ ... │ │ "The quarterly rev.."│ ← 400ms
│ ... (8 seconds) │ │ "The quarterly rev..." │ ← 600ms
│ Full answer appears │ │ ... tokens arrive │
└──────────────────────┘ │ Full answer complete │ ← 8s
└──────────────────────┘
Streaming reduces perceived latency from seconds to milliseconds. Users see the answer forming in real time, which feels significantly faster even though the total time is the same.
Gaia's SSE Format¶
The POST /ask/stream endpoint returns a Server-Sent Events (SSE) stream. Each event follows the SSE specification:
event: message
data: {"responseString": "The quarterly "}
event: message
data: {"responseString": "revenue was "}
event: message
data: {"responseString": "$4.2 billion."}
event: message
data: {"queryUid": "abc-123", "conversationId": "conv-456", "documents": [...], "finishReason": "stop"}
Key details:
- Each event has an
eventfield (alwaysmessage) and adatafield containing JSON. - The
responseStringfield contains the incremental text token — not the full response so far. - The final event includes
queryUid,conversationId,documents, andfinishReason. - Events are separated by blank lines.
- Lines starting with
:are comments (used as keepalives).
SSE Field Reference¶
| Field | Type | Description |
|---|---|---|
responseString | string | Incremental text token |
queryUid | string | Unique query ID (final event only) |
conversationId | string | Conversation ID (final event only) |
documents | array | Source documents (final event only) |
finishReason | string | "stop", "length", etc. (final event only) |
Backend: Proxying SSE from Gaia¶
Your backend proxies the SSE stream from Gaia to the frontend using FastAPI's StreamingResponse. This keeps the API key on the server and lets you add logging, filtering, or transformation.
Using the SDK's ask_stream_iter¶
The SDK's ask_stream_iter() method yields individual StreamChunk objects. Wrap it in an async generator and return it as a StreamingResponse:
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from gaia_sdk import GaiaClient
from gaia_sdk.streaming import StreamChunk
from backend.api.dependencies import get_gaia_client
from backend.models.api_models import AskRequest
router = APIRouter()
@router.post("/ask/stream", tags=["Ask"])
async def ask_stream(
request: AskRequest,
client: GaiaClient = Depends(get_gaia_client),
):
"""Proxy a streaming RAG query from Gaia to the frontend."""
async def event_generator():
try:
async for chunk in client.ask_stream_iter(
dataset_names=request.dataset_names,
query=request.query,
conversation_id=request.conversation_id,
):
yield f"event: {chunk.event}\ndata: {chunk.data}\n\n"
except Exception as exc:
# Send error as a final SSE event so the frontend can handle it
import json
error_data = json.dumps({"error": str(exc)})
yield f"event: error\ndata: {error_data}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
Disable response buffering
Reverse proxies (nginx, AWS ALB) often buffer responses. The X-Accel-Buffering: no header tells nginx to stream through. For other proxies, check their documentation for similar settings.
Using ask_stream for Accumulated Results¶
If you don't need to forward individual chunks — for example, you want to process the full answer before responding — use ask_stream() instead:
@router.post("/ask/stream-complete", tags=["Ask"])
async def ask_stream_complete(
request: AskRequest,
client: GaiaClient = Depends(get_gaia_client),
):
"""Use streaming internally but return the full result."""
result = await client.ask_stream(
dataset_names=request.dataset_names,
query=request.query,
)
return {
"response": result.full_text,
"queryUid": result.query_uid,
"conversationId": result.conversation_id,
"documents": result.documents,
"finishReason": result.finish_reason,
}
Frontend: Consuming the Stream¶
Option 1: fetch with ReadableStream (Recommended)¶
The fetch API provides a ReadableStream on the response body. This is the most flexible approach and works in all modern browsers.
// src/hooks/useStreaming.ts
export function useStreaming() {
const {
addUserMessage,
addAssistantMessage,
updateAssistantMessage,
setAssistantComplete,
setConversationId,
setLoading,
conversationId,
} = useChatStore();
const sendMessage = async (datasetNames: string[], query: string) => {
addUserMessage(query);
setLoading(true);
const assistantId = `msg-${Date.now()}`;
addAssistantMessage(assistantId, "");
try {
const response = await fetch("/api/v1/ask/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Session-ID": getSessionId() ?? "",
},
body: JSON.stringify({ datasetNames, queryString: query, conversationId }),
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
if (!response.body) throw new Error("No response body");
const reader = response.body.getReader();
const decoder = new TextDecoder();
let accumulated = "";
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Split on double newlines (SSE event boundaries)
const events = buffer.split("\n\n");
buffer = events.pop() ?? "";
for (const event of events) {
for (const line of event.split("\n")) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
try {
const parsed = JSON.parse(data);
if (parsed.responseString) {
accumulated += parsed.responseString;
updateAssistantMessage(assistantId, accumulated);
}
if (parsed.conversationId) {
setConversationId(parsed.conversationId);
}
if (parsed.finishReason) {
setAssistantComplete(assistantId, parsed.queryUid, parsed.documents);
}
if (parsed.error) {
throw new Error(parsed.error);
}
} catch (e: any) {
if (e.message !== data) {
// JSON parse succeeded but contained an error
throw e;
}
// Non-JSON data: treat as raw text
accumulated += data;
updateAssistantMessage(assistantId, accumulated);
}
}
}
}
}
setAssistantComplete(assistantId);
} catch (error: any) {
updateAssistantMessage(assistantId, `Error: ${error.message}`);
setAssistantComplete(assistantId);
} finally {
setLoading(false);
}
};
return { sendMessage };
}
Option 2: EventSource (GET only)¶
EventSource is the browser's native SSE client, but it only supports GET requests. Since Gaia's streaming endpoint is POST, you'd need a GET wrapper on your backend:
@router.get("/ask/stream/{conversation_id}", tags=["Ask"])
async def ask_stream_get(
conversation_id: str,
q: str,
datasets: str, # comma-separated
client: GaiaClient = Depends(get_gaia_client),
):
dataset_list = [d.strip() for d in datasets.split(",")]
# ... same streaming logic ...
const source = new EventSource(
`/api/v1/ask/stream/${convId}?q=${encodeURIComponent(query)}&datasets=${datasets.join(",")}`
);
source.onmessage = (event) => {
const parsed = JSON.parse(event.data);
// handle tokens...
};
source.onerror = () => {
source.close();
};
When to use EventSource
EventSource is simpler but limited to GET requests and doesn't support custom headers. Use fetch + ReadableStream for POST-based endpoints and when you need the X-Session-ID header.
SDK Streaming Methods Compared¶
| Method | Returns | Use Case |
|---|---|---|
ask_stream() | StreamResult | Need the full result; streaming is an internal optimization |
ask_stream_iter() | AsyncIterator[StreamChunk] | Need to forward individual chunks to the frontend |
ask_stream() — Accumulated Result¶
result = await client.ask_stream(["my-dataset"], "What happened?")
# result.full_text = "The complete answer..."
# result.documents = [...]
# result.query_uid = "abc-123"
ask_stream_iter() — Chunk by Chunk¶
async for chunk in client.ask_stream_iter(["my-dataset"], "What happened?"):
# chunk.event = "message"
# chunk.data = '{"responseString": "The "}'
# chunk.parsed = {"responseString": "The "} (or None if not JSON)
pass
Handling Partial JSON and Connection Drops¶
Partial JSON¶
SSE data lines can occasionally be split across TCP frames. The SDK handles this by buffering lines until a complete event (terminated by a blank line) is received. On the frontend, the buffer-and-split pattern in useStreaming achieves the same:
// Buffer incomplete events
buffer += decoder.decode(value, { stream: true });
const events = buffer.split("\n\n");
buffer = events.pop() ?? ""; // last element may be incomplete
Connection Drops¶
If the connection drops mid-stream:
The ask_stream_iter() generator will raise an httpx exception when the underlying connection closes. Wrap the generator in a try/except to handle this gracefully:
async def event_generator():
try:
async for chunk in client.ask_stream_iter(...):
yield f"event: {chunk.event}\ndata: {chunk.data}\n\n"
except httpx.RemoteProtocolError:
logger.warning("Connection dropped during stream")
except Exception as exc:
error_data = json.dumps({"error": str(exc)})
yield f"event: error\ndata: {error_data}\n\n"
The reader.read() call resolves with done: true if the connection closes cleanly, or throws if it drops unexpectedly:
Testing Streaming Endpoints¶
curl¶
curl -N -X POST http://localhost:8000/api/v1/ask/stream \
-H "Content-Type: application/json" \
-H "X-Session-ID: your-session-id" \
-d '{"datasetNames": ["my-dataset"], "queryString": "Summarize the Q4 report"}'
The -N flag disables output buffering so you see events as they arrive.
Python script¶
import httpx
async def test_stream():
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/api/v1/ask/stream",
json={"datasetNames": ["my-dataset"], "queryString": "Summarize Q4"},
headers={"X-Session-ID": "your-session-id"},
) as response:
async for line in response.aiter_lines():
print(line)
Next Steps¶
- Refine & Feedback — Post-processing answers with document selection.
- Error Handling — Handling errors during streams.
- The Gaia Client Library — SDK streaming internals.