Chapter 95 min read

Chapter 9: Resilience & Observability: Fallbacks, Retries, and Callbacks

In developmental environments, LLM APIs appear highly reliable. However, when your application goes to production and receives continuous traffic, you will inevitably encounter failures:

  • Rate Limits (HTTP 429): Cloud providers limit the number of tokens or requests you can make per minute.
  • API Downtime: Services like OpenAI, Groq, or Anthropic occasionally experience partial or full outages.
  • Context Exhaustion: If a user submits a massive query, the model will reject it.

To build production-grade systems, you must design your application for resilience. Furthermore, you need a way to log execution latency, cost, and trace prompts without polluting your application's core logic.

In this chapter, we will master .with_fallbacks() for automatic provider switching and write custom Callbacks to monitor telemetry.


9.1 The Fallback Pattern: .with_fallbacks()

The base Runnable class exposes the .with_fallbacks() method. This allows you to define a list of alternative runnables. If the primary runnable throws an exception during execution, LangChain catches the error and tries the fallbacks in sequence.

primary_model = ChatGroq(model="llama3-8b-8192")
backup_model = ChatOllama(model="llama3.2")

# Bind a fallback to the model
resilient_model = primary_model.with_fallbacks([backup_model])

You are not limited to fallback models; you can define fallbacks for entire chains. For example, if a complex RAG chain fails (perhaps due to vector store connection timeout), you can fall back to a simpler chat chain that answers without retrieval context.

Specifying Error Types

By default, .with_fallbacks() catches all exceptions. You can restrict it to target specific exceptions (e.g., catching only API connection errors while letting schema validation errors fail fast):

from httpx import ConnectError

resilient_chain = primary_chain.with_fallbacks(
    [backup_chain],
    exceptions_to_handle=(ConnectError,) # Target connection issues specifically
)

9.2 Callbacks: Telemetry and Logging

In software engineering, adding print statements (print("Step A started...")) inside your core functions is an anti-pattern. It creates spaghetti code that is hard to maintain.

LangChain provides a robust Callback System (defined in langchain_core.callbacks). By inheriting from BaseCallbackHandler, you can hook into the life cycle of every runnable execution.

Here are the primary hooks available:

  • on_chat_model_start: Triggered when the model receives prompts.
  • on_llm_end: Triggered when the LLM finishes generating text.
  • on_chain_start: Triggered when a RunnableSequence begins.
  • on_tool_start / on_tool_end: Triggered when a tool is called.

Creating a Custom Callback Handler

import time
from langchain_core.callbacks import BaseCallbackHandler

class PerformanceMonitoringHandler(BaseCallbackHandler):
    def on_chain_start(self, serialized, prompts, **kwargs):
        self.start_time = time.time()
        print("[Telemetry] Pipeline execution initiated...")
        
    def on_llm_end(self, response, **kwargs):
        latency = time.time() - self.start_time
        print(f"[Telemetry] Model finished generating.")
        print(f" -> Latency: {latency:.4f} seconds")
        if "token_usage" in response.llm_output:
            print(f" -> Tokens: {response.llm_output['token_usage']}")

To run this handler, you pass it to the .invoke() configuration block at runtime:

config = {"callbacks": [PerformanceMonitoringHandler()]}
chain.invoke("Hello", config=config)

9.3 Hands-on Example: Resilient Multi-Provider Pipeline

Let's build a resilient script. We'll set up a primary LLM (simulating a cloud provider like Groq) and a fallback model (a local Ollama model). We'll attach a custom logging handler that tracks the execution times and captures details if a failover happens.

import asyncio
import time
from typing import Any, Dict, List
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_groq import ChatGroq
from langchain_ollama import ChatOllama

# --- 1. Define telemetry Callback Handler ---
class TelemetryCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.start_times = {}

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        model_name = serialized.get("id", ["LLM"])[-1]
        self.start_times[model_name] = time.time()
        print(f"\n[Telemetry] Starting LLM execution: '{model_name}'")

    def on_llm_end(self, response, **kwargs: Any) -> None:
        # Calculate latency
        for generation in response.generations:
            for gen in generation:
                # Get model identifier from metadata
                model_name = response.llm_output.get("model_name", "Unknown Model") if response.llm_output else "Unknown Model"
                start_time = self.start_times.get(model_name, time.time())
                elapsed = time.time() - start_time
                print(f"[Telemetry] LLM finished: '{model_name}'")
                print(f" -> Latency: {elapsed:.2f} seconds")
                
                # Check for token metrics
                if response.llm_output and "token_usage" in response.llm_output:
                    usage = response.llm_output["token_usage"]
                    print(f" -> Prompt Tokens: {usage.get('prompt_tokens')}")
                    print(f" -> Completion Tokens: {usage.get('completion_tokens')}")

    def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
        print(f"\n[Telemetry ALERT] Model execution failed! Error: {error}")

# --- 2. Build Resilient Chain ---
async def main():
    # Primary model (We configure it with a fake API key to guarantee a failure)
    # This simulates a cloud API going down.
    primary_model = ChatGroq(
        model="llama3-8b-8192", 
        groq_api_key="gsk_fake_key_to_force_failure",
        max_retries=0 # Fail instantly for test purposes
    )
    
    # Fallback model: Local Ollama
    fallback_model = ChatOllama(model="llama3.2", temperature=0.1)
    
    # Create the resilient model definition
    resilient_model = primary_model.with_fallbacks([fallback_model])
    
    # Prompt & chain
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an AI research assistant. Provide concise definitions."),
        ("human", "What is the core concept behind {topic}?")
    ])
    
    chain = prompt | resilient_model | StrOutputParser()
    
    # 3. Invoke with custom callbacks
    config = {"callbacks": [TelemetryCallbackHandler()]}
    
    print("Initiating resilient prompt invocation...")
    try:
        response = await chain.ainvoke(
            {"topic": "Quantum Computing"}, 
            config=config
        )
        print("\n--- FINAL ANSWER ---")
        print(response)
    except Exception as e:
        print(f"Fatal chain failure: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Flow Analysis of the Execution:

  1. Start Hook: The pipeline starts. The prompt formats the input.
  2. Primary Try: The prompt is sent to ChatGroq. The model triggers on_llm_start.
  3. Failure: Since the API key is incorrect, the network throws an unauthorized connection error. The handler triggers on_llm_error.
  4. Fallback Trap: LangChain intercepts the exception, suppresses it, and immediately reruns the prompt using the backup ChatOllama model.
  5. Ollama Execution: The callback triggers on_llm_start for ChatOllama.
  6. Success: The local model completes generation successfully. The callback prints latency metrics, and the output text is returned to the caller.

The final user is unaware that the cloud service crashed. They simply receive their answer, while your logs record the warning details.


9.4 Summary

You now understand:

  1. How to design resilient pipelines using .with_fallbacks().
  2. How to intercept life cycle milestones using BaseCallbackHandler.
  3. How to calculate latency and token consumption metrics.

In our final chapter, Chapter 10: Advanced Runnables & Production Deployment with LangServe, we will explore intermediate event streaming using astream_events and see how to deploy our final chains as production REST endpoints.

    Chapter 9: Resilience & Observability: Fallbacks, Retries, and Callbacks — Mastering LangChain: From Basics to Stateful Agents | Krishna Tiwari