Gaia as a LangChain retriever¶
This guide is for developers learning LangChain and for AI agents generating integration code. It shows how to expose Cohesity Gaia as a LangChain BaseRetriever, so you can plug enterprise RAG search into chains, agents, and LCEL pipelines.
Prerequisites
- Authentication and a working Gaia dataset.
- Exhaustive search — the retriever uses
exhaustive_search(notPOST /ask) so you retrieve documents without asking Gaia to run the answer LLM on that step. Your LangChain app (or another LLM) controls generation separately. - Optional: Gaia + Unstructured.io if you also normalize files before indexing.
How to read this page¶
| If you are… | Do this |
|---|---|
| New to LangChain | Read the Concepts section, then run Minimal example end-to-end. |
| Shipping production code | Read Sync vs async and add logging, retries, and timeouts. |
| An AI agent | Use the Implementer checklist; pin langchain-core to a version your environment supports. |
Concepts (short)¶
| Term | Meaning here |
|---|---|
| Retriever | Something that turns a string question into a list of documents (text + metadata). In LangChain this is usually langchain_core.documents.Document. |
| Gaia exhaustive search | Returns ranked snippets and metadata (doc_id, filename, filepath, …) for every match in a dataset, with pagination. See Exhaustive Search. |
Why not GaiaClient.ask() for retrieval? | ask runs Gaia’s full RAG answer (LLM) on the server. A retriever should only fetch context; you attach your own LLM in LangChain if you want. |
Install¶
Use the same environment where you already install gaia-sdk from this repo (make sdk-install). Versions change often; pin langchain-core in your app’s requirements.txt when you go to production.
Custom retriever: GaiaExhaustiveRetriever¶
The class below:
- Subclasses LangChain’s
BaseRetriever. - Calls
GaiaClient.exhaustive_search()withpage_sizecapped byk(max 100 per Gaia API). - Maps each Gaia hit to
Document(page_content=snippet, metadata={...}). - Implements async retrieval (
ainvoke) for FastAPI/Jupyter, and syncinvokeonly when no asyncio loop is running (see comments).
"""
Gaia exhaustive search → LangChain BaseRetriever.
Environment: GAIA_API_KEY, GAIA_BASE_URL (optional), GAIA_VERIFY_SSL (optional)
"""
from __future__ import annotations
import asyncio
from typing import Any
from gaia_sdk import GaiaClient
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from pydantic import ConfigDict, Field
class GaiaExhaustiveRetriever(BaseRetriever):
"""Retrieve top-k snippets from one Gaia dataset via exhaustive search."""
model_config = ConfigDict(arbitrary_types_allowed=True)
gaia: Any # active GaiaClient (must already be inside async context manager)
dataset_name: str
k: int = Field(default=10, ge=1, le=100, description="Max documents to return")
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun,
) -> list[Document]:
"""Sync path: only works when no event loop is running (CLI scripts)."""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(self._documents_for_query(query))
raise RuntimeError(
"GaiaExhaustiveRetriever: inside an active event loop use "
"`await retriever.ainvoke(query)` instead of `retriever.invoke(query)`."
)
async def _aget_relevant_documents(
self,
query: str,
*,
run_manager: AsyncCallbackManagerForRetrieverRun,
) -> list[Document]:
return await self._documents_for_query(query)
async def _documents_for_query(self, query: str) -> list[Document]:
result = await self.gaia.exhaustive_search(
dataset_name=self.dataset_name,
query=query,
page_size=min(self.k, 100),
)
out: list[Document] = []
for hit in (result.documents or [])[: self.k]:
out.append(
Document(
page_content=(hit.snippet or "").strip(),
metadata={
"doc_id": hit.doc_id,
"filename": hit.filename,
"filepath": hit.filepath,
"score": hit.score,
"source": "gaia_exhaustive",
},
)
)
return out
Minimal example (async)¶
import asyncio
from gaia_sdk import GaiaClient
# GaiaExhaustiveRetriever from the previous section
async def main() -> None:
async with GaiaClient.from_env() as gaia:
retriever = GaiaExhaustiveRetriever(
gaia=gaia,
dataset_name="my-dataset",
k=5,
)
docs = await retriever.ainvoke("What is our travel reimbursement policy?")
for i, doc in enumerate(docs, 1):
print(f"--- {i} score={doc.metadata.get('score')} ---")
print(doc.page_content[:400])
print()
if __name__ == "__main__":
asyncio.run(main())
Optional: one-step RAG chain (LCEL)¶
Below, langchain_core builds a tiny pipeline: retrieve → format context → prompt → LLM. You must supply an LLM that implements LangChain’s chat interface (for example ChatOpenAI from langchain-openai). This snippet is illustrative — swap models and keys per your org.
import asyncio
from gaia_sdk import GaiaClient
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
# from langchain_openai import ChatOpenAI # pip install langchain-openai
# GaiaExhaustiveRetriever class defined above
def format_docs(docs: list) -> str:
blocks = []
for d in docs:
name = d.metadata.get("filename") or "document"
blocks.append(f"### {name}\n{d.page_content}")
return "\n\n".join(blocks)
async def main() -> None:
async with GaiaClient.from_env() as gaia:
retriever = GaiaExhaustiveRetriever(gaia=gaia, dataset_name="my-dataset", k=8)
async def retrieve(x: dict):
return await retriever.ainvoke(x["question"])
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Answer using only the context. If unsure, say you do not have enough information.",
),
(
"human",
"Context:\n{context}\n\nQuestion: {question}",
),
]
)
# pip install langchain-openai; export OPENAI_API_KEY
# llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = (
RunnablePassthrough.assign(docs=RunnableLambda(retrieve))
| RunnablePassthrough.assign(
context=lambda x: format_docs(x["docs"]),
question=lambda x: x["question"],
)
| prompt
)
# Full RAG: add `| llm | StrOutputParser()` to `chain`, then await chain.ainvoke(...)
messages = await chain.ainvoke({"question": "Summarize our remote work policy."})
print(messages) # ChatPromptValue — ready to send to any chat model
if __name__ == "__main__":
asyncio.run(main())
Add | llm | StrOutputParser() to chain when a chat model is configured (OPENAI_API_KEY, Azure, Anthropic, etc.). The same structure works with create_retrieval_chain-style helpers in the full langchain package if you prefer higher-level APIs.
Pagination and “get everything”¶
GaiaExhaustiveRetriever intentionally returns only the first page of exhaustive search (up to k ≤ 100). If you need every matching document (e.g. compliance), page inside your own code — see Exhaustive Search — Paginated Search — or keep retrieval at top-k and run a separate export job.
Sync vs async¶
| Call site | Use |
|---|---|
asyncio.run(...), plain script | retriever.invoke(query) may work (no loop). |
FastAPI, async def, Jupyter | await retriever.ainvoke(query) |
LangChain create_retrieval_chain / agents | Prefer async routes so ainvoke propagates cleanly. |
Security and limits¶
- API keys:
GaiaClient.from_env()— never embed keys in LangChainmetadataor prompts. - Snippets are not full documents:
page_contentis the search snippet. For full text, fetch the object from storage (same theme as Gaia + Unstructured.io). - Rate limits: add retries and backoff around
exhaustive_searchin production.
Implementer checklist¶
- Install
langchain-coreandgaia-sdk; pin versions in your service. - Confirm
dataset_nameexists and is indexed. - Use
ainvokein async services; avoidinvokeinside running loops. - Decide whether snippets are enough or you need a second step to load full file text.
- Add observability (structured logs, trace IDs) before production traffic.
Related Gaia documentation¶
External links¶
LangChain is a trademark of its respective owner. This guide is informational and does not imply partnership or endorsement.