Chapter 64 min read

Chapter 6: Building the LCEL Retrieval Pipeline

In legacy LangChain, developers constructed RAG pipelines using opaque, black-box wrappers like ConversationalRetrievalChain or create_retrieval_chain. While these wrappers are easy to type, they are notoriously difficult to debug and customize in production.

If you need to change how the prompt formats the documents, or if you need to track exactly which document chunk was sent to the LLM to generate a user-facing citation, the black-box wrappers become massive roadblocks.

The modern standard is to build the RAG pipeline from scratch using LangChain Expression Language (LCEL). In this chapter, we will design an advanced, transparent retrieval pipeline.


6.1 The Naive RAG Chain

Let's look at the basic LCEL implementation of RAG. It uses RunnablePassthrough to pipe data into the prompt template.

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 1. Define the Prompt
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# 2. Helper function to combine documents
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 3. Construct the Chain
naive_rag_chain = (
    # Retrieve docs, format them as a string, and pass the question unchanged
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 4. Execute
answer = naive_rag_chain.invoke("What is the company leave policy?")
print(answer)

The Problem: The format_docs function converts the List[Document] object into a single massive string before it enters the prompt. The chain returns the string answer, but the original source documents are permanently lost. You cannot show the user which files the AI referenced!


6.2 The Advanced Pattern: RAG with Source Citations

To build a UI that shows "Sources Cited", we must construct an LCEL pipeline that returns a dictionary containing both the final answer AND the raw retrieved documents.

We achieve this by utilizing RunnableParallel to branch the execution, and the .assign() method to compute the answer without destroying the input variables.

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 1. We create a Parallel runner to grab the documents and the question
retrieval_branch = RunnableParallel(
    {
        "context": retriever,               # Returns List[Document]
        "question": RunnablePassthrough()   # Returns the raw string query
    }
)

# 2. We use .assign() to calculate the answer based on the context/question dict.
# .assign() ADDS a new key to the dictionary without removing the existing keys!
rag_chain_with_sources = retrieval_branch.assign(
    answer=(
        # Format the context into a string just for the prompt
        RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
        | prompt
        | model
        | StrOutputParser()
    )
)

Flow Analysis

When you invoke rag_chain_with_sources.invoke("What is the policy?"):

  1. Retrieval Branch: The retriever fetches 3 chunks. The output is a dict: {"context": [Doc1, Doc2, Doc3], "question": "What is the policy?"}.
  2. Assign Block: The pipeline copies that dictionary and routes it into the answer calculation sub-chain.
  3. Prompt Formatting: Inside the sub-chain, format_docs turns the documents into a string so the prompt can compile.
  4. Generation: The LLM generates the response string.
  5. Final Output: The assign block attaches the generated string to the original dictionary.

The final result is a beautiful, fully traceable dictionary:

result = rag_chain_with_sources.invoke("What is the policy?")

print("Answer:", result["answer"])

print("\nSources:")
for doc in result["context"]:
    print(f"- {doc.metadata['source']} (Page {doc.metadata['page']})")

6.3 Asynchronous Execution & Streaming

In modern applications, LLM responses are streamed back to the client token-by-token (like a typewriter) to reduce perceived latency.

Because we built our chain using pure LCEL, we get asynchronous and streaming capabilities automatically, without changing the pipeline code.

import asyncio

async def stream_rag_response():
    # To stream the text of the answer while also getting the sources,
    # we iterate over the asynchronous event stream.
    
    print("Retrieving and generating...\n")
    
    async for chunk in rag_chain_with_sources.astream("What is the policy?"):
        # The chain streams the dictionary keys as they resolve.
        
        # When the retrieval branch finishes, it yields the context.
        if "context" in chunk:
            print("[System] Documents retrieved successfully.")
            
        # As the LLM generates the answer, it yields string chunks.
        if "answer" in chunk:
            print(chunk["answer"], end="", flush=True)

asyncio.run(stream_rag_response())

6.4 What's Next?

We now have a production-grade RAG pipeline that preserves source citations and streams responses.

However, this pipeline still relies on a single vector search query. What if the user's initial question is worded poorly? What if the relevant data is spread across different chunks?

In Chapter 7, we will dive into Advanced Retrieval Algorithms. We will move beyond naive vector lookup and implement query routing, HyDE (Hypothetical Document Embeddings), and Self-Querying mechanisms to drastically increase retrieval precision.

    Chapter 6: Building the LCEL Retrieval Pipeline — Complete RAG In Python | Krishna Tiwari