Chapter 3: Data Flow Control: Passthroughs, Parallels, and Custom Lambdas
In Chapter 2, we built linear chains where the output of component A flowed directly as the input of component B. While this works for basic text processing, real-world AI pipelines require complex data orchestration. You may need to run multiple checks in parallel, inject custom validation logic, or keep track of the original user input as it travels through several intermediate processing layers.
To manage this, LangChain provides three essential utility runnables:
RunnablePassthrough: Passes input data unchanged, or adds extra keys to a dictionary.RunnableParallel: Executes multiple processing branches in parallel and combines their results.RunnableLambda: Converts standard Python functions into custom chain components.
In this chapter, we will master these components and assemble them into a highly efficient, concurrent processing pipeline.
3.1 RunnablePassthrough & Dynamic Key Assignment
When passing data through a chain, you often need to preserve the user's raw input. For example, in a retrieval-augmented generation (RAG) system, the prompt template needs both the retrieved document context and the original query.
If you write a simple pipe, the vector store retrieval step will consume the query and output text documents—meaning the original user query is lost before it reaches the model.
RunnablePassthrough solves this. It has two main uses:
A. Passing Data Unchanged
It acts as an identity function.
from langchain_core.runnables import RunnablePassthrough
chain = RunnablePassthrough() | model
# The input string is passed straight to the model.
B. Adding Key-Value Pairs with .assign()
The static method .assign() allows you to add new keys to a dictionary input dynamically, without destroying the existing keys.
# Assume the initial input is: {"question": "What is photosythesis?"}
setup_and_retrieval = RunnablePassthrough.assign(
context=lambda x: retrieve_docs(x["question"])
)
# After running setup_and_retrieval, the output dictionary is:
# {
# "question": "What is photosythesis?",
# "context": "Photosynthesis is the process used by plants..."
# }
This is a clean way to build up the dictionary of parameters required by your downstream prompt templates.
3.2 RunnableParallel: Concurrent Branching
When building web services, latency is a critical metric. If your application needs to classify a message, check it for safety, and search a database, running these tasks sequentially is inefficient.
RunnableParallel allows you to execute any number of runnables concurrently on the same input, combining their results into a single dictionary output.
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Define two independent chains
joke_chain = ChatPromptTemplate.from_template("Tell me a joke about {topic}") | model | StrOutputParser()
poem_chain = ChatPromptTemplate.from_template("Write a poem about {topic}") | model | StrOutputParser()
# Execute them concurrently
parallel_chain = RunnableParallel(
joke=joke_chain,
poem=poem_chain
)
# invoke returns a dict: {"joke": "...", "poem": "..."}
result = parallel_chain.invoke({"topic": "programming"})
Under the hood, LangChain manages a thread pool (for synchronous invocations) or registers them on the active event loop (for asynchronous invocations), optimizing execution time automatically.
3.3 RunnableLambda: Integrating Custom Python Code
While LangChain provides many built-in components, you will frequently need to write custom logic: sanitizing strings, hitting custom REST APIs, or parsing files.
RunnableLambda wraps any standard Python function (synchronous or asynchronous) and exposes the standard Runnable interface. This allows it to be piped directly into any LCEL sequence.
from langchain_core.runnables import RunnableLambda
def clean_text(text: str) -> str:
# Custom cleaning logic
return text.strip().lower()
# Wrap and chain
custom_runnable = RunnableLambda(clean_text)
chain = prompt | model | StrOutputParser() | custom_runnable
Async and Configs in RunnableLambda
If your custom logic involves asynchronous operations, you simply define the wrapped function as async def:
async def fetch_external_status(input_data: dict) -> dict:
# Perform non-blocking async network calls
async with aiohttp.ClientSession() as session:
...
return result
async_runnable = RunnableLambda(fetch_external_status)
Furthermore, LangChain will automatically inject a RunnableConfig dictionary into your lambda if your function signature requests it. This config allows you to extract run tags, trace IDs, or run-time parameters.
3.4 Hands-on Example: Concurrent Multi-Aspect Processor
Let's build a professional pipeline that takes a technical topic, generates a formal academic definition, produces comma-separated index tags, and formats the output into a structured report.
We will use:
RunnablePassthroughto map inputs.RunnableParallelto execute the definition and tag generation concurrently.RunnableLambdato format and post-process the results.
import asyncio
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
from langchain_ollama import ChatOllama
# 1. Define custom formatting logic inside a Python function
def format_final_report(results: dict) -> str:
"""
Takes the parallel output dict and compiles it into a markdown report.
"""
topic = results["topic"]
definition = results["definition"].strip()
keywords = results["keywords"].strip()
return f"""# Tech Briefing: {topic.upper()}
## Technical Definition
{definition}
## Index Keywords
`{keywords}`
---
*Report compiled automatically via LCEL concurrent pipeline.*
"""
async def main():
# 2. Initialize LLM (Swap for ChatOpenAI or ChatGroq if preferred)
llm = ChatOllama(model="llama3.2", temperature=0.1)
# 3. Create independent processing chains
definition_chain = (
ChatPromptTemplate.from_template("Explain the concept of '{topic}' in 2-3 precise sentences.")
| llm
| StrOutputParser()
)
keyword_chain = (
ChatPromptTemplate.from_template(
"List exactly 5 keywords or tags for '{topic}' as a comma-separated list. "
"Return only the tags. Do not write introductory text or explanations."
)
| llm
| StrOutputParser()
)
# 4. Construct the parallel orchestrator
orchestrator = RunnableParallel(
topic=RunnablePassthrough(), # Passes the raw topic string down
definition=definition_chain, # Runs definition LLM chain
keywords=keyword_chain # Runs keyword LLM chain
)
# 5. Build the complete master pipeline
# - Maps input string to dict format: {"topic": topic}
# - Runs parallel LLM invocations
# - Pipes dict result to custom python formatting lambda
master_chain = (
{"topic": RunnablePassthrough()}
| orchestrator
| RunnableLambda(format_final_report)
)
# 6. Execute the chain asynchronously
target_topic = "Serverless Computing"
print(f"Running master pipeline for topic: '{target_topic}'...")
report = await master_chain.ainvoke(target_topic)
print("\n--- GENERATED REPORT ---")
print(report)
if __name__ == "__main__":
asyncio.run(main())
Why this is superior:
- Efficiency: The
definition_chainandkeyword_chainmake API calls to the LLM. By putting them inside aRunnableParallel, they execute simultaneously. If each call takes 2 seconds, the total execution time is ~2 seconds, not 4. - Modularity: Every component in the chain is decoupled. You can modify the formatting function, swap the keyword generation prompt, or upgrade the model without breaking the chain's structural definition.
3.5 Summary
You now understand:
- How to pass data and construct dictionaries using
RunnablePassthroughand.assign(). - How to achieve concurrent processing and reduce latency using
RunnableParallel. - How to wrap custom Python functions (including async operations) inside a chain using
RunnableLambda.
In the next chapter, we will build on this structural understanding by studying Dynamic Chain Routing & Selection. We will learn how to inspect inputs at runtime and conditionally guide them down different execution paths.