Chapter 87 min read

Chapter 8: RAG the LCEL Way: Document Ingestion, Retrieval, and Generation

A common limitation of LLMs is their static training data. If you ask an LLM about your company's proprietary codebase, yesterday's sales figures, or a new software framework released after the model's training cutoff, it will hallucinate or fail.

RAG (Retrieval-Augmented Generation) solves this by retrieving relevant text documents from a database and injecting them into the LLM's prompt context before calling the API.

While LangChain provides high-level helpers like create_retrieval_chain, professional developers avoid them because they hide the prompt structures and data flow. In this chapter, we will build a complete RAG system from scratch using pure LCEL, enabling full access to retrieved sources and metadata citations.


8.1 The RAG Architecture

A standard RAG system is divided into two distinct pipelines:

  1. Ingestion (Off-line): Documents are loaded, split into manageable chunks, converted into numerical vectors (embeddings), and stored in a Vector Database.
  2. Retrieval & Generation (On-line): The user asks a question. The system embeds the query, searches the vector database for matching chunks, formats a prompt containing the question and chunks, and submits it to the LLM.
       [ Ingestion Pipeline ]                   [ Retrieval & Generation Pipeline ]
                                                
┌─────────────────────────────────┐                    ┌─────────────────────────┐
│     Source Documents (PDFs)     │                    │      User Query         │
└───────────────┬─────────────────┘                    └────────────┬────────────┘
                │                                                   │
                ▼                                                   ▼
┌─────────────────────────────────┐                    ┌─────────────────────────┐
│        Text Splitting           │                    │     Query Embedding     │
└───────────────┬─────────────────┘                    └────────────┬────────────┘
                │                                                   │
                ▼                                                   ▼
┌─────────────────────────────────┐                    ┌─────────────────────────┐
│      Embedding Generation       │                    │    Vector Similarity    │
└───────────────┬─────────────────┘                    │         Search          │
                │                                      └────────────┬────────────┘
                ▼                                                   │
┌─────────────────────────────────┐                                 │ (Retrieves Docs)
│        Vector Database          │◄────────────────────────────────┘
│        (Chroma/FAISS)           │
└─────────────────────────────────┘
                │
                ▼ (Injected into context)
┌─────────────────────────────────┐
│    Prompt Template & LLM        │
└───────────────┬─────────────────┘
                │
                ▼
┌─────────────────────────────────┐
│          Final Answer           │
└─────────────────────────────────┘

8.2 Ingestion: Loading, Splitting, and Embedding

Let's write an ingestion script. We'll use a text loader, a character splitter, and a local FAISS vector store.

Step 1: Install Ingestion Libraries

We install FAISS (a fast vector search engine by Meta) and helper tools:

pip install langchain-community langchain-text-splitters faiss-cpu

Step 2: Chunking & Embedding

We load text documents, break them into 500-character segments with 100-character overlaps (ensuring context isn't split at borders), and store them:

from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
# Alternately: from langchain_community.embeddings import OllamaEmbeddings

# 1. Load the document
loader = TextLoader("company_policy.txt")
docs = loader.load()

# 2. Split the document into chunks
# RecursiveCharacterTextSplitter splits by paragraphs, sentences, and words to avoid splitting text mid-phrase.
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
chunks = splitter.split_documents(docs)

# 3. Create Embeddings & Store in FAISS
embeddings = OpenAIEmbeddings() # Or OllamaEmbeddings(model="nomic-embed-text")
vector_db = FAISS.from_documents(chunks, embeddings)

# 4. Expose as a Retriever
# A retriever is a Runnable that accepts a string query and returns a list of Documents.
retriever = vector_db.as_retriever(search_kwargs={"k": 3})

8.3 The Pure-LCEL RAG Pipeline

Now that we have a retriever, we must feed the retrieved documents into the LLM. A naive LCEL RAG chain looks like this:

# Naive RAG Chain (Returns only the final text string)
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

The problem with the naive approach: It outputs a string containing the answer. The raw documents pulled from the database are thrown away. In a production app, you want to show the user the source articles or PDF pages (metadata citations) so they can verify the answer.

The Advanced Citation Pattern

We use RunnableParallel and .assign() to generate a dictionary containing both the final answer and the exact retrieved source documents.

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 1. Custom formatting function to join document contents
def format_docs(docs) -> str:
    return "\n\n".join(doc.page_content for doc in docs)

# 2. Construct the pipeline
rag_chain_with_sources = RunnableParallel(
    {
        "context": retriever,               # Fetches List[Document]
        "question": RunnablePassthrough()   # Passes the raw user query
    }
).assign(
    # We assign the 'answer' key by executing the prompt | model pipeline.
    # The input to assign is {"context": List[Document], "question": "..."}.
    # We format the 'context' field into a string before feeding the prompt.
    answer=RunnablePassthrough.assign(
        context=lambda x: format_docs(x["context"])
    )
    | prompt
    | model
    | StrOutputParser()
)

When you run rag_chain_with_sources.invoke("What is the refund policy?"), it returns:

{
    "question": "What is the refund policy?",
    "context": [Document(page_content="...", metadata={"source": "..."}), ...],
    "answer": "Our refund policy states that you have 30 days..."
}

8.4 Hands-on Example: End-to-End RAG System with Citations

Let's write a complete, runnable script. We will mock the input document in Python for demonstration.

import asyncio
import os
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_community.vectorstores import FAISS
from langchain_ollama import ChatOllama, OllamaEmbeddings

# --- 1. Seed Mock Document File ---
mock_document_content = """
Title: Antigravity IDE Architecture Guidelines
Date: June 2026
Guidelines:
1. All AI plugins must register their capabilities in the main configuration schema.
2. Web application development must prioritize HSL color spaces and sleek dark modes.
3. Do not hardcode API keys. Use environmental loaders.
4. The workspace root must remain clean of temporary scratch files.
5. Production builds must run compilation tests before bundling.
"""

with open("guide.txt", "w", encoding="utf-8") as f:
    f.write(mock_document_content)

# --- 2. Ingestion Setup ---
async def setup_rag_retriever():
    from langchain_community.document_loaders import TextLoader
    from langchain_text_splitters import RecursiveCharacterTextSplitter
    
    loader = TextLoader("guide.txt")
    docs = loader.load()
    
    splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=50)
    splits = splitter.split_documents(docs)
    
    # We use local Ollama Nomics embeddings (or OpenAIEmbeddings if key is set)
    # Make sure you have 'nomic-embed-text' pulled in Ollama (ollama pull nomic-embed-text)
    # Or fall back to local CPU sentence-transformers via HuggingFace
    try:
        embeddings = OllamaEmbeddings(model="nomic-embed-text")
        vectorstore = FAISS.from_documents(splits, embeddings)
    except Exception:
        print("Falling back to HuggingFace CPU embeddings due to missing local Ollama embeddings...")
        from langchain_community.embeddings import HuggingFaceEmbeddings
        embeddings = HuggingFaceEmbeddings()
        vectorstore = FAISS.from_documents(splits, embeddings)
        
    return vectorstore.as_retriever(search_kwargs={"k": 2})

# --- 3. Run Pipeline ---
async def main():
    retriever = await setup_rag_retriever()
    
    # Initialize the local Chat Model
    llm = ChatOllama(model="llama3.2", temperature=0.1)
    
    # Prompt Template
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a technical compliance officer. "
                   "Answer the question using ONLY the provided context. "
                   "If you don't know the answer based on the context, say so.\n\n"
                   "Context:\n{context}"),
        ("human", "{question}")
    ])
    
    # Helper to join document strings
    def format_docs(docs) -> str:
        return "\n\n".join(f"[{doc.metadata.get('source', 'unknown')}]: {doc.page_content}" for doc in docs)
    
    # Assemble the Chain
    rag_chain = RunnableParallel(
        {
            "context": retriever,
            "question": RunnablePassthrough()
        }
    ).assign(
        answer=RunnablePassthrough.assign(
            context=lambda x: format_docs(x["context"])
        )
        | prompt
        | llm
        | StrOutputParser()
    )
    
    # Execute Query
    query = "What color spaces should be prioritized in Web application development?"
    print(f"Querying RAG: '{query}'\n")
    
    result = await rag_chain.ainvoke(query)
    
    print("--- ANSWER ---")
    print(result["answer"])
    
    print("\n--- SOURCES CITED ---")
    for doc in result["context"]:
        print(f" - Source: {doc.metadata.get('source')} | Content: {doc.page_content.strip()}")
        
    # Clean up mock file
    if os.path.exists("guide.txt"):
        os.remove("guide.txt")

if __name__ == "__main__":
    asyncio.run(main())

8.5 Critical RAG Best Practices

  1. Metadata Preservation: When splitting files, ensure source identifiers (like filenames, page numbers, or URLs) are stored in the document's metadata dictionary. This makes debugging citations much easier.
  2. Chunk Overlap: Always maintain an overlap (e.g. 10–20% of the chunk size). This guarantees that concepts spanning across split boundaries are not lost or split in half.
  3. Prompt Containment: Always instruct the LLM: "If you do not know the answer based on the context, state that you do not know." This minimizes hallucinations by forcing the model to rely solely on the database.

8.6 Summary

You now understand:

  1. The architecture of RAG: Ingestion, Retrieval, and Generation.
  2. How to split documents and index them in a vector database using FAISS.
  3. How to construct an advanced RAG pipeline in LCEL that returns both the text answer and the source document nodes.

In the next chapter, we will address error handling, fallbacks, and callbacks to make our LangChain pipelines resilient against API failures and record latency metrics.

    Chapter 8: RAG the LCEL Way: Document Ingestion, Retrieval, and Generation — Mastering LangChain: From Basics to Stateful Agents | Krishna Tiwari