Chapter 36 min read

Chapter 3: Data Flow Control: Passthroughs, Parallels, and Custom Lambdas

In Chapter 2, we built linear chains where the output of component A flowed directly as the input of component B. While this works for basic text processing, real-world AI pipelines require complex data orchestration. You may need to run multiple checks in parallel, inject custom validation logic, or keep track of the original user input as it travels through several intermediate processing layers.

To manage this, LangChain provides three essential utility runnables:

  1. RunnablePassthrough: Passes input data unchanged, or adds extra keys to a dictionary.
  2. RunnableParallel: Executes multiple processing branches in parallel and combines their results.
  3. RunnableLambda: Converts standard Python functions into custom chain components.

In this chapter, we will master these components and assemble them into a highly efficient, concurrent processing pipeline.


3.1 RunnablePassthrough & Dynamic Key Assignment

When passing data through a chain, you often need to preserve the user's raw input. For example, in a retrieval-augmented generation (RAG) system, the prompt template needs both the retrieved document context and the original query.

If you write a simple pipe, the vector store retrieval step will consume the query and output text documents—meaning the original user query is lost before it reaches the model.

RunnablePassthrough solves this. It has two main uses:

A. Passing Data Unchanged

It acts as an identity function.

from langchain_core.runnables import RunnablePassthrough

chain = RunnablePassthrough() | model
# The input string is passed straight to the model.

B. Adding Key-Value Pairs with .assign()

The static method .assign() allows you to add new keys to a dictionary input dynamically, without destroying the existing keys.

# Assume the initial input is: {"question": "What is photosythesis?"}
setup_and_retrieval = RunnablePassthrough.assign(
    context=lambda x: retrieve_docs(x["question"])
)

# After running setup_and_retrieval, the output dictionary is:
# {
#   "question": "What is photosythesis?",
#   "context": "Photosynthesis is the process used by plants..."
# }

This is a clean way to build up the dictionary of parameters required by your downstream prompt templates.


3.2 RunnableParallel: Concurrent Branching

When building web services, latency is a critical metric. If your application needs to classify a message, check it for safety, and search a database, running these tasks sequentially is inefficient.

RunnableParallel allows you to execute any number of runnables concurrently on the same input, combining their results into a single dictionary output.

from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Define two independent chains
joke_chain = ChatPromptTemplate.from_template("Tell me a joke about {topic}") | model | StrOutputParser()
poem_chain = ChatPromptTemplate.from_template("Write a poem about {topic}") | model | StrOutputParser()

# Execute them concurrently
parallel_chain = RunnableParallel(
    joke=joke_chain,
    poem=poem_chain
)

# invoke returns a dict: {"joke": "...", "poem": "..."}
result = parallel_chain.invoke({"topic": "programming"})

Under the hood, LangChain manages a thread pool (for synchronous invocations) or registers them on the active event loop (for asynchronous invocations), optimizing execution time automatically.


3.3 RunnableLambda: Integrating Custom Python Code

While LangChain provides many built-in components, you will frequently need to write custom logic: sanitizing strings, hitting custom REST APIs, or parsing files.

RunnableLambda wraps any standard Python function (synchronous or asynchronous) and exposes the standard Runnable interface. This allows it to be piped directly into any LCEL sequence.

from langchain_core.runnables import RunnableLambda

def clean_text(text: str) -> str:
    # Custom cleaning logic
    return text.strip().lower()

# Wrap and chain
custom_runnable = RunnableLambda(clean_text)
chain = prompt | model | StrOutputParser() | custom_runnable

Async and Configs in RunnableLambda

If your custom logic involves asynchronous operations, you simply define the wrapped function as async def:

async def fetch_external_status(input_data: dict) -> dict:
    # Perform non-blocking async network calls
    async with aiohttp.ClientSession() as session:
        ...
    return result

async_runnable = RunnableLambda(fetch_external_status)

Furthermore, LangChain will automatically inject a RunnableConfig dictionary into your lambda if your function signature requests it. This config allows you to extract run tags, trace IDs, or run-time parameters.


3.4 Hands-on Example: Concurrent Multi-Aspect Processor

Let's build a professional pipeline that takes a technical topic, generates a formal academic definition, produces comma-separated index tags, and formats the output into a structured report.

We will use:

  • RunnablePassthrough to map inputs.
  • RunnableParallel to execute the definition and tag generation concurrently.
  • RunnableLambda to format and post-process the results.
import asyncio
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
from langchain_ollama import ChatOllama

# 1. Define custom formatting logic inside a Python function
def format_final_report(results: dict) -> str:
    """
    Takes the parallel output dict and compiles it into a markdown report.
    """
    topic = results["topic"]
    definition = results["definition"].strip()
    keywords = results["keywords"].strip()
    
    return f"""# Tech Briefing: {topic.upper()}

## Technical Definition
{definition}

## Index Keywords
`{keywords}`

---
*Report compiled automatically via LCEL concurrent pipeline.*
"""

async def main():
    # 2. Initialize LLM (Swap for ChatOpenAI or ChatGroq if preferred)
    llm = ChatOllama(model="llama3.2", temperature=0.1)
    
    # 3. Create independent processing chains
    definition_chain = (
        ChatPromptTemplate.from_template("Explain the concept of '{topic}' in 2-3 precise sentences.")
        | llm
        | StrOutputParser()
    )
    
    keyword_chain = (
        ChatPromptTemplate.from_template(
            "List exactly 5 keywords or tags for '{topic}' as a comma-separated list. "
            "Return only the tags. Do not write introductory text or explanations."
        )
        | llm
        | StrOutputParser()
    )
    
    # 4. Construct the parallel orchestrator
    orchestrator = RunnableParallel(
        topic=RunnablePassthrough(), # Passes the raw topic string down
        definition=definition_chain,  # Runs definition LLM chain
        keywords=keyword_chain        # Runs keyword LLM chain
    )
    
    # 5. Build the complete master pipeline
    # - Maps input string to dict format: {"topic": topic}
    # - Runs parallel LLM invocations
    # - Pipes dict result to custom python formatting lambda
    master_chain = (
        {"topic": RunnablePassthrough()}
        | orchestrator
        | RunnableLambda(format_final_report)
    )
    
    # 6. Execute the chain asynchronously
    target_topic = "Serverless Computing"
    print(f"Running master pipeline for topic: '{target_topic}'...")
    
    report = await master_chain.ainvoke(target_topic)
    print("\n--- GENERATED REPORT ---")
    print(report)

if __name__ == "__main__":
    asyncio.run(main())

Why this is superior:

  1. Efficiency: The definition_chain and keyword_chain make API calls to the LLM. By putting them inside a RunnableParallel, they execute simultaneously. If each call takes 2 seconds, the total execution time is ~2 seconds, not 4.
  2. Modularity: Every component in the chain is decoupled. You can modify the formatting function, swap the keyword generation prompt, or upgrade the model without breaking the chain's structural definition.

3.5 Summary

You now understand:

  1. How to pass data and construct dictionaries using RunnablePassthrough and .assign().
  2. How to achieve concurrent processing and reduce latency using RunnableParallel.
  3. How to wrap custom Python functions (including async operations) inside a chain using RunnableLambda.

In the next chapter, we will build on this structural understanding by studying Dynamic Chain Routing & Selection. We will learn how to inspect inputs at runtime and conditionally guide them down different execution paths.

    Chapter 3: Data Flow Control: Passthroughs, Parallels, and Custom Lambdas — Mastering LangChain: From Basics to Stateful Agents | Krishna Tiwari