⏳
Loading cheatsheet...
State graphs, nodes, edges, checkpointing, interrupt/resume, and multi-agent orchestration with LangGraph.
| Concept | Description |
|---|---|
| State (TypedDict) | Defines the data schema that flows through the entire graph |
| Nodes | Python functions that receive state, perform work, and return partial state updates |
| Edges | Connections between nodes — can be fixed or conditional (branching logic) |
| Graph | The complete workflow: nodes + edges, compiled into a runnable application |
| Checkpointer | Persistence layer that saves state at every step for resumable runs |
| Channels | Control how state keys are merged (append, overwrite, custom reducer) |
| Feature | LangChain Chains | LangGraph |
|---|---|---|
| Topology | Linear (A → B → C) | Cyclic, branching, arbitrary graph |
| Loops | Not natively supported | First-class support with conditional edges |
| State | Implicit (passed through chain) | Explicit TypedDict with reducers |
| Branching | Limited (if/else in prompts) | Conditional edges based on state |
| Persistence | Manual / session-based | Built-in checkpointing per step |
| Human-in-the-loop | Manual implementation | Native interrupt() and resume |
| Multi-agent | Complex to orchestrate | Subgraphs, supervisor patterns |
| Streaming | astream_events | stream_mode: values, updates, messages, tokens |
| Debugging | LangSmith traces | LangSmith + LangGraph Studio visual debugging |
| Use Case | Recommended |
|---|---|
| Simple prompt → response | LangChain Chain or LCEL (simpler) |
| Multi-step with branching logic | LangGraph |
| Agent loops (ReAct, plan-and-execute) | LangGraph |
| Multi-agent orchestration | LangGraph (subgraphs) |
| Human approval workflows | LangGraph (interrupt/resume) |
| Stateful long-running conversations | LangGraph (checkpointing) |
| Complex RAG with iterative retrieval | LangGraph |
| Chatbots with tool calling | LangGraph (create_react_agent) |
langgraph.prebuilt module provides quick-start agents like create_react_agent. Always use TypedDict for state — never plain dicts — and add Annotated reducers for list-type channels.from typing import TypedDict, Annotated, Literal
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
# ── Basic State ──
class State(TypedDict):
messages: list[BaseMessage]
documents: list[str]
current_step: str
next_action: str
# ── State with Annotated Reducers ──
class AgentState(TypedDict):
# add_messages APPENDS new messages (doesn't overwrite)
messages: Annotated[list[BaseMessage], add_messages]
# Regular fields are OVERWRITTEN by default
documents: list[str]
current_step: str
retry_count: int| Channel Type | Behavior | Example |
|---|---|---|
| Default (no annotation) | Overwrites previous value | current_step: str |
| add_messages | Appends messages, deduplicates by ID | messages: Annotated[list, add_messages] |
| operator.add | Merges lists (extends) | documents: Annotated[list, operator.add] |
| Custom reducer fn | Any function(old, new) → merged | Annotated[list, my_custom_reducer] |
import operator
from typing import Annotated, TypedDict
def merge_docs(existing: list, new: list) -> list:
"""Deduplicate documents by content."""
existing_set = {d for d in existing}
merged = list(existing)
for doc in new:
if doc not in existing_set:
merged.append(doc)
existing_set.add(doc)
return merged
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
documents: Annotated[list[str], merge_docs]
queries: Annotated[list[str], operator.add]
max_depth: int| Practice | Reason |
|---|---|
| Always use TypedDict | Type safety, IDE autocomplete, validation |
| Use Annotated for lists | Prevents accidental overwrite of accumulated data |
| Keep state minimal | Only include what nodes actually need |
| Avoid mutable defaults | Don't use [] or {} as default values |
| Use descriptive key names | current_step, next_action, retry_count vs step, action, count |
| Separate message channels | messages channel with add_messages, other data separately |
add_messages reducer is special: it understands langchain_core.messages and will remove messages with matching IDs (for updates/deletes). For any list that should accumulate over time, always use an Annotated[list, reducer] — plain lists get overwritten by each node return.from langchain_core.messages import AIMessage, HumanMessage
# ── Synchronous Node ──
def chatbot(state: State):
response = model.invoke(state["messages"])
return {"messages": [response]}
# ── Async Node (preferred for I/O) ──
async def researcher(state: State):
query = state["messages"][-1].content
docs = await vectorstore.asimilarity_search(query)
return {"documents": [d.page_content for d in docs]}
# ── Node with Conditional Logic ──
def router(state: State):
last = state["messages"][-1].content.lower()
if "code" in last:
return {"current_step": "code_review"}
return {"current_step": "general"}| Rule | Description |
|---|---|
| Input | Receives the full State TypedDict as a single argument |
| Output | Returns a dict with partial state updates (keys must match State fields) |
| Can be sync or async | async preferred for network I/O (LLM calls, DB queries) |
| Side effects | Nodes CAN perform side effects (API calls, DB writes) |
| Return format | Return {"key": value} — only include keys you want to update |
| Multiple updates | Return multiple keys: {"messages": [...], "current_step": "done"} |
| No partial types | Return values must match the types defined in State TypedDict |
{} (empty dict), the state passes through unchanged. If a node returns {"current_step": "done"}, only that field is updated — everything else stays the same. Reducers like add_messages handle the merging logic.from langgraph.graph import StateGraph, START, END
graph = StateGraph(State)
# ── 1. Normal Edge (fixed connection) ──
graph.add_edge("node_a", "node_b")
graph.add_edge("node_b", END)
graph.add_edge(START, "node_a")
# ── 2. Conditional Edge (branching) ──
def route(state: State) -> Literal["tools", "end"]:
last = state["messages"][-1]
if hasattr(last, "tool_calls") and last.tool_calls:
return "tools"
return "end"
graph.add_conditional_edges(
"agent", # source node
route, # routing function
{"tools": "tools", "end": END} # optional name mapping
)
# ── 3. Multiple conditional targets ──
def supervisor(state) -> Literal["researcher", "writer", END]:
msg = state["messages"][-1].content.lower()
if "research" in msg:
return "researcher"
if "write" in msg:
return "writer"
return END
graph.add_conditional_edges("supervisor", supervisor)| Type | Method | Use Case | Example |
|---|---|---|---|
| Normal | add_edge(from, to) | Linear flow between two nodes | A → B → C |
| Conditional | add_conditional_edges(from, fn) | Branching based on state | Router → Tools or END |
| START | add_edge(START, node) | Entry point of the graph | START → agent |
| END | add_edge(node, END) | Terminal node | response → END |
| Map-reduce | Send to subgraph, collect results | Parallel processing |
Literal["node_a", "node_b", END] for type safety. The optional path_map parameter maps return values to node names — useful when the routing function returns keys different from node names.from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
documents: list[str]
current_step: str
# Define nodes
def agent(state: State):
response = model.invoke(state["messages"])
return {"messages": [response]}
def retriever(state: State):
query = state["messages"][-1].content
docs = vectorstore.similarity_search(query)
return {"documents": [d.page_content for d in docs]}
def should_retrieve(state: State) -> Literal["retriever", END]:
if state["current_step"] == "retrieve":
return "retriever"
return END
# Build graph
graph = StateGraph(State)
graph.add_node("agent", agent)
graph.add_node("retriever", retriever)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_retrieve)
graph.add_edge("retriever", END)
# Compile into runnable
app = graph.compile()
# Run
result = app.invoke({
"messages": [HumanMessage(content="What is LangGraph?")]
})| Step | Method | Description |
|---|---|---|
| 1. Define state | class State(TypedDict) | Create your data schema with reducers |
| 2. Create graph | StateGraph(State) | Instantiate with state type |
| 3. Add nodes | graph.add_node(name, fn) | Register node functions |
| 4. Add edges | graph.add_edge / add_conditional_edges | Wire nodes together |
| 5. Connect START/END | graph.add_edge(START, ...) | Define entry and exit points |
| 6. Compile | graph.compile() | Produces a Runnable (invoke, stream, batch) |
| 7. Run | app.invoke(input, config) | Execute the compiled graph |
graph.compile() validates the graph structure. It checks for unreachable nodes, missing entry points, and type mismatches. Always compile before running. The compiled app is a LangChain Runnable, so it supports invoke(), stream(), batch(), and astream_events().from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.base import BaseCheckpointSaver
# ── 1. In-Memory (development/testing) ──
memory = MemorySaver()
app = graph.compile(checkpointer=memory)
# ── 2. SQLite (production, lightweight) ──
app = graph.compile(
checkpointer=SqliteSaver.from_conn_string("checkpoints.db")
)
# ── 3. PostgreSQL (production, scalable) ──
app = graph.compile(
checkpointer=PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/langgraph"
)
)
# ── 4. Async Checkpointer ──
app = graph.compile(
checkpointer=AsyncPostgresSaver.from_conn_string(conn_str)
)# ── Thread-scoped conversation ──
config = {"configurable": {"thread_id": "user-session-123"}}
# First turn
result1 = app.invoke(
{"messages": [HumanMessage(content="Hi, I'm Alice")]},
config
)
# Second turn (remembers context from thread)
result2 = app.invoke(
{"messages": [HumanMessage(content="What's my name?")]},
config # same thread_id → remembers Alice
)
# Different thread → fresh conversation
new_config = {"configurable": {"thread_id": "new-session"}}
result3 = app.invoke(
{"messages": [HumanMessage(content="Hi")]},
new_config # doesn't remember Alice
)
# ── Get current state without running ──
state = app.get_state(config)
print(state.values) # full state snapshot
print(state.next) # next nodes to execute
print(state.config) # config metadata
print(state.metadata) # timestamps, run IDs# ── Browse checkpoint history ──
for state in app.get_state_history(config):
print(f"Step: {state.metadata['step']}")
print(f"Values: {state.values}")
print(f"Next: {state.next}")
print("---")
# ── Replay from a past checkpoint ──
past_state = list(app.get_state_history(config))[3]
app.invoke(None, past_state.config)
# ── Update state manually ──
app.update_state(
config,
{"messages": [HumanMessage(content="new input")]},
as_node="agent" # which node to resume from
)| Checkpointer | Persistence | Use Case | Async |
|---|---|---|---|
| MemorySaver | In-memory only | Development, testing, demos | No |
| SqliteSaver | File-based | Small production apps, local dev | Yes |
| PostgresSaver | PostgreSQL database | Production, multi-process | Yes |
| Custom (BaseCheckpointSaver) | Your storage backend | Redis, S3, custom DB | Optional |
thread_id to scope conversations — the same thread always resumes where it left off. Without a checkpointer, every invoke() starts fresh.from langgraph.types import interrupt, ResumeValue
from langgraph.graph import StateGraph, START, END
def approval_node(state: State):
# Pauses execution here and returns the value
# to the caller. The graph state is checkpointed.
user_decision = interrupt({
"question": "Approve this action?",
"action": state["proposed_action"],
"details": state["action_details"]
})
# After resume, user_decision contains the human's response
if user_decision == "approved":
return {"status": "executed", "result": "Action completed"}
return {"status": "rejected", "result": "Action cancelled"}
# ── Compile with checkpointer (required for interrupts) ──
app = graph.compile(checkpointer=memory, interrupt_before=["approval_node"])
# OR interrupt after a node runs:
app = graph.compile(checkpointer=memory, interrupt_after=["agent"])import json
# ── Step 1: Run until interrupt ──
config = {"configurable": {"thread_id": "review-001"}}
result = app.invoke(
{"messages": [HumanMessage(content="Delete all records")]},
config
)
# Returns at the interrupt point with the interrupt value
# Graph is paused, state is saved
# ── Step 2: Human reviews (in your application layer) ──
state = app.get_state(config)
print(state.tasks[0].interrupts[0].value)
# {"question": "Approve this action?", ...}
# ── Step 3: Resume with human's decision ──
result = app.invoke(
Command(resume="approved"),
config # same thread_id!
)
# Graph continues from where it paused
# user_decision variable receives "approved"| Strategy | Method | When to Use |
|---|---|---|
| interrupt_before | compile(interrupt_before=["node"]) | Pause BEFORE a node executes |
| interrupt_after | compile(interrupt_after=["node"]) | Pause AFTER a node completes |
| interrupt() call | Inside a node function | Dynamic pause with custom data to display |
| Command(resume=value) | invoke(Command(resume=val)) | Resume with human-provided value |
| Pattern | Description | Example |
|---|---|---|
| Approval gate | Pause before destructive actions | Data deletion, API writes, payments |
| Input collection | Pause to ask for missing info | Form fields, file uploads |
| Review & edit | Let user modify agent output | Edit generated code/text before commit |
| Error escalation | Pause when agent is uncertain | Low-confidence decisions, hallucination risk |
| Step-by-step | Manual control over each step | Complex workflows needing oversight |
interrupt() function saves the graph state and returns control to your application. When you call invoke(Command(resume=value), config) with the same thread_id, execution continues from exactly where it paused. This is the foundation for production AI systems where humans need to approve or correct agent actions.from typing import Literal
from langgraph.graph import StateGraph, START, END
def supervisor(state: State) -> Literal["researcher", "writer", "reviewer", END]:
"""Central orchestrator that routes to specialist agents."""
last_msg = state["messages"][-1].content.lower()
step = state.get("current_step", "")
if step == "research":
return "researcher"
elif step == "write":
return "writer"
elif step == "review":
return "reviewer"
return END
def researcher(state: State):
docs = search_tool.invoke(state["messages"][-1].content)
return {"documents": docs, "current_step": "write"}
def writer(state: State):
draft = model.invoke(f"Write based on: {state['documents']}")
return {"messages": [draft], "current_step": "review"}
def reviewer(state: State):
feedback = model.invoke(f"Review: {state['messages'][-1].content}")
if "approved" in feedback.content.lower():
return {"current_step": "done"}
return {"current_step": "write"} # loop back
graph = StateGraph(State)
graph.add_node("supervisor", supervisor)
for name, fn in [("researcher", researcher), ("writer", writer), ("reviewer", reviewer)]:
graph.add_node(name, fn)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor)
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")from langgraph.prebuilt import create_react_agent
# ── Create specialized agents ──
research_agent = create_react_agent(model, search_tools, prompt="You are a researcher...")
code_agent = create_react_agent(model, code_tools, prompt="You are a coder...")
writer_agent = create_react_agent(model, [], prompt="You are a writer...")
# ── Swarm: agents hand off to each other via tool calls ──
from langchain_core.tools import tool
@tool
def transfer_to_research(query: str) -> str:
"""Transfer to the research agent."""
return f"Please research: {query}"
@tool
def transfer_to_coder(task: str) -> str:
"""Transfer to the coding agent."""
return f"Please code: {task}"
# Each agent has handoff tools to delegate work
supervisor_agent = create_react_agent(
model,
[transfer_to_research, transfer_to_coder, ...],
prompt="Route tasks to the appropriate specialist."
)| Pattern | Topology | Best For | Complexity |
|---|---|---|---|
| Supervisor | Star (hub-and-spoke) | Controlled delegation, review cycles | Medium |
| Swarm / Handoffs | Peer-to-peer via tools | Flexible delegation, ad-hoc collaboration | Low-Medium |
| Hierarchical | Nested supervisors | Large orgs, deep specialist chains | High |
| Parallel (map-reduce) | Fan-out, then merge | Independent tasks, data processing | Medium |
| Blackboard | Shared state, event-driven | Collaborative problem solving | High |
class MultiAgentState(TypedDict):
messages: Annotated[list, add_messages]
# Shared data between agents
research_findings: Annotated[list[str], operator.add]
code_artifacts: Annotated[list[str], operator.add]
review_feedback: list[str]
current_agent: str
task_status: str
# All nodes share the same state
# Researcher writes to research_findings
# Coder reads research_findings, writes to code_artifacts
# Reviewer reads code_artifacts, writes to review_feedbackfrom langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
# Define tools
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return web_search.run(query)
@tool
def calculator(expression: str) -> float:
"""Evaluate a math expression."""
return eval(expression)
# Create a complete ReAct agent in one line
model = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(model, [search_web, calculator])
# Use it like any compiled graph
result = agent.invoke(
{"messages": [HumanMessage(content="What is 15% of 2340?")]}
)
# With memory
from langgraph.checkpoint.memory import MemorySaver
agent = create_react_agent(
model,
[search_web, calculator],
checkpointer=MemorySaver() # enable persistence
)
config = {"configurable": {"thread_id": "demo"}}
result = agent.invoke({"messages": [HumanMessage(content="Hi")]}, config)from langgraph.prebuilt import ToolNode
# ── ToolNode auto-routes tool calls to the right tool ──
tool_node = ToolNode([search_web, calculator, file_read])
# Add to your graph
graph.add_node("tools", tool_node)
# Conditional routing to ToolNode
def should_use_tools(state):
last = state["messages"][-1]
if hasattr(last, "tool_calls") and last.tool_calls:
return "tools"
return END
graph.add_conditional_edges("agent", should_use_tools)
graph.add_edge("tools", "agent") # loop back to agent
# ── Custom error handling on tools ──
tool_node = ToolNode(
tools,
handle_tool_errors=True # returns error as ToolMessage
)create_react_agent is the fastest way to build a functional agent. It automatically sets up the ReAct loop (think → act → observe → repeat). It supports tools, system prompts, checkpointers, and streaming. For production agents that need custom logic, build from StateGraph — but for prototypes and simple tool-calling agents, the prebuilt version is production-ready.from langgraph.graph import StateGraph, START, END
# ── Define subgraph state ──
class ResearchState(TypedDict):
query: str
results: list[str]
# ── Build subgraph ──
def search_node(state: ResearchState):
results = search_tool.invoke(state["query"])
return {"results": [results]}
def summarize_node(state: ResearchState):
summary = model.invoke(f"Summarize: {state['results']}")
return {"results": [summary.content]}
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_node)
research_graph.add_node("summarize", summarize_node)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_app = research_graph.compile()
# ── Use subgraph as a node in parent graph ──
class MainState(TypedDict):
messages: Annotated[list, add_messages]
research_data: list[str]
def call_research(state: MainState):
query = state["messages"][-1].content
result = research_app.invoke({"query": query})
return {"research_data": result["results"]}
main_graph = StateGraph(MainState)
main_graph.add_node("research", call_research) # subgraph as node!
main_graph.add_edge(START, "research")| Pattern | Description | Use Case |
|---|---|---|
| Subgraph as node | Compiled subgraph added as a single node | Encapsulate agent logic, reuse across graphs |
| Shared checkpointer | Parent and subgraph share the same checkpointer | Unified state history across all levels |
| State mapping | Parent state maps to subgraph state | Different schemas at different levels |
| Nested subgraphs | Subgraph containing other subgraphs | Deep hierarchies (company → team → agent) |
| Map-reduce subgraph | Fan-out to N subgraph instances, merge results | Parallel processing, batch operations |
# ── 1. Stream values (full state after each node) ──
for event in app.stream(input_state, config, stream_mode="values"):
print(event) # complete state after each step
# ── 2. Stream updates (only changed keys per node) ──
for event in app.stream(input_state, config, stream_mode="updates"):
print(event) # {"node_name": {"key": "updated_value"}}
# ── 3. Stream messages (token-by-token LLM output) ──
for msg, metadata in app.stream(
input_state, config, stream_mode="messages"
):
print(msg.content, end="", flush=True)
# ── 4. Stream custom events from nodes ──
from langgraph.types import StreamWriter
def my_node(state: State, *, writer: StreamWriter):
writer({"status": "processing", "step": 1})
# ... do work ...
writer({"status": "complete", "result": "done"})
return {"current_step": "done"}
for event in app.stream(input, config, stream_mode="custom"):
print(event) # receives custom writer events
# ── 5. Combine multiple stream modes ──
async for event in app.astream(
input_state, config,
stream_mode=["messages", "updates"]
):
print(event)| Mode | Output | Best For |
|---|---|---|
| "values" | Full state after each node | Debugging, seeing complete state progression |
| "updates" | Only changed keys per node | Efficient monitoring, UI updates |
| "messages" | Token-by-token message chunks | Chat UIs, real-time LLM streaming |
| "custom" | Custom events via StreamWriter | Progress bars, status updates, logs |
| ["a", "b"] | Multiple modes combined | Full observability + UX streaming |
stream_mode="messages" for chat interfaces. It delivers tokens as they're generated, giving users real-time feedback. Use "updates" for efficient state-change monitoring. Combine both with stream_mode=["messages", "updates"] for full observability. Always use astream() (async) in production for better concurrency.from langchain_core.tools import tool, StructuredTool
from pydantic import BaseModel, Field
# ── Simple tool with decorator ──
@tool
def search(query: str) -> str:
"""Search the web for information about a topic."""
return search_engine.run(query)
# ── Tool with typed arguments (Pydantic) ──
class CodeReviewInput(BaseModel):
code: str = Field(description="The code to review")
language: str = Field(description="Programming language")
@tool(args_schema=CodeReviewInput)
def review_code(code: str, language: str) -> str:
"""Review code and provide feedback."""
return f"Review for {language}: {code[:100]}..."
# ── Bind tools to model ──
model_with_tools = model.bind_tools([search, review_code])
# ── Use in a node ──
def agent_node(state: State):
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
# ── Conditional routing for tool calls ──
def route_tools(state: State) -> Literal["tools", END]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return ENDfrom langgraph.prebuilt import ToolNode, tools_condition
# ── Standard tool-loop pattern ──
tools = [search, calculator, file_reader]
model_with_tools = model.bind_tools(tools)
tool_node = ToolNode(tools)
graph = StateGraph(State)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent")
# Built-in routing helper
graph.add_conditional_edges("agent", tools_condition)
# tools_condition returns "tools" if tool_calls exist, END otherwise
graph.add_edge("tools", "agent") # loop back
app = graph.compile(checkpointer=MemorySaver())
# ── Invoke ──
result = app.invoke(
{"messages": [HumanMessage(content="Search for LangGraph docs")]},
{"configurable": {"thread_id": "tool-demo"}}
)
# Agent: "I'll search..." → ToolNode: runs search → Agent: "Here's what..."tools_condition is a built-in helper that checks if the last message has tool_calls. It returns "tools" or END — perfect for the standard ReAct loop. Always bind tools to the model with model.bind_tools() so the LLM knows what tools are available and can generate proper tool call arguments.import functools
from langchain_core.messages import ToolMessage
# ── 1. Retry with decorator ──
@functools.lru_cache(maxsize=128)
@tool
def reliable_search(query: str) -> str:
"""Search with built-in caching for reliability."""
try:
return search_api.run(query)
except Exception as e:
return f"Search failed: {str(e)}"
# ── 2. Fallback model in node ──
async def resilient_agent(state: State):
models = [primary_model, fallback_model]
for model in models:
try:
response = await model.ainvoke(state["messages"])
return {"messages": [response]}
except Exception:
continue
return {"messages": [AIMessage(content="All models unavailable")]}
# ── 3. Tool error handling in ToolNode ──
tool_node = ToolNode(tools, handle_tool_errors=True)
# On error: returns ToolMessage with error content
# Agent sees the error and can retry or adapt
# ── 4. Max iteration guard ──
def should_continue(state: State) -> Literal["tools", END]:
messages = state["messages"]
# Guard against infinite loops
tool_calls = sum(
len(m.tool_calls) for m in messages
if hasattr(m, "tool_calls")
)
if tool_calls > 10:
return END # safety limit
if state["messages"][-1].tool_calls:
return "tools"
return END| Error Type | Strategy | Implementation |
|---|---|---|
| LLM API failure | Model fallback chain | Try backup model on rate limit / timeout |
| Tool execution error | handle_tool_errors=True | Returns error as ToolMessage for agent to handle |
| Infinite loop | Max iteration counter | Count tool calls, force END after N iterations |
| State validation | Pydantic validators | Validate state shape at critical nodes |
| Timeout | asyncio.wait_for | Set timeouts on individual node executions |
| Missing state key | Default values in TypedDict | Use Optional or default values |
| Checkpointer failure | Retry + backup checkpointer | Redis primary, SQLite fallback |
END when exceeded. Use handle_tool_errors=True on ToolNode so errors are passed to the agent as messages rather than crashing the graph.| Feature | Description |
|---|---|
| Visual debugger | See node-by-node execution in real-time |
| State inspector | View full state at each step of execution |
| Interactive inputs | Test with different inputs without code changes |
| Breakpoints | Pause execution at any node for inspection |
| Error tracing | See exactly where and why errors occur |
| Stream preview | Watch streaming output in real-time |
| Run | Local dev server: langgraph dev |
| Deploy | One-click deploy to LangSmith Deployment |
| Feature | Description |
|---|---|
| Formerly | LangGraph Platform (renamed Oct 2025) |
| Hosting | Cloud-hosted or self-hosted with Docker |
| API | REST API for running compiled graphs remotely |
| Auth | API keys, OAuth, custom auth middleware |
| Versioning | Multiple versions of the same graph deployed |
| Monitoring | Built-in LangSmith tracing and analytics |
| Cron/scheduled | Run graphs on schedules (cron expressions) |
| Webhooks | Trigger graph runs via HTTP webhooks |
{
"dependencies": ["./requirements.txt"],
"graphs": {
"agent": "./graph.py:app"
},
"env": ".env"
}langgraph dev to start the Studio locally. Deploy to LangSmith for managed hosting, auto-scaling, and integrated LangSmith tracing. The langgraph.json config file defines your graph entry points and dependencies.class RAGState(TypedDict):
messages: Annotated[list, add_messages]
documents: Annotated[list[str], operator.add]
question: str
needs_more_context: bool
def retrieve(state: RAGState):
docs = vectorstore.similarity_search(state["question"], k=3)
return {"documents": [d.page_content for d in docs]}
def grade_documents(state: RAGState):
"""Check if retrieved docs answer the question."""
scored = model.invoke(
f"Grade relevance of docs to: {state['question']}\nDocs: {state['documents']}"
)
relevant = [d for d in state["documents"] if d in scored.content]
needs_more = len(relevant) < 2
return {"documents": relevant, "needs_more_context": needs_more}
def generate(state: RAGState):
context = "\n".join(state["documents"])
response = model.invoke(f"Answer: {state['question']}\nContext: {context}")
return {"messages": [response]}
def should_retrieve(state: RAGState) -> Literal["retrieve", "generate"]:
if state["needs_more_context"]:
return "retrieve"
return "generate"
graph = StateGraph(RAGState)
graph.add_node("retrieve", retrieve)
graph.add_node("grade", grade_documents)
graph.add_node("generate", generate)
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "grade")
graph.add_conditional_edges("grade", should_retrieve)
graph.add_edge("generate", END)class ReviewState(TypedDict):
messages: Annotated[list, add_messages]
code: str
language: str
issues: Annotated[list[str], operator.add]
approved: bool
def analyze(state: ReviewState):
analysis = model.invoke(
f"Analyze this {state['language']} code for bugs, "
f"security issues, and style problems:\n{state['code']}"
)
return {"issues": [analysis.content]}
def decide(state: ReviewState) -> Literal["fix_suggestions", "approve"]:
if state["issues"]:
return "fix_suggestions"
return "approve"
def fix_suggestions(state: ReviewState):
fixes = model.invoke(
f"Suggest fixes for: {state['issues']}\nCode: {state['code']}"
)
return {"messages": [fixes], "approved": False}
def approve(state: ReviewState):
return {"messages": [AIMessage(content="Code approved ✓")], "approved": True}
graph = StateGraph(ReviewState)
graph.add_node("analyze", analyze)
graph.add_node("fix_suggestions", fix_suggestions)
graph.add_node("approve", approve)
graph.add_edge(START, "analyze")
graph.add_conditional_edges("analyze", decide)
graph.add_edge("fix_suggestions", END)
graph.add_edge("approve", END)| Practice | Why It Matters |
|---|---|
| Use TypedDict for state | Type safety, IDE support, prevents runtime errors |
| Use Annotated reducers | Prevents accidental state overwrites |
| Always use checkpointing | Enables memory, debugging, HITL, and recovery |
| Use conditional edges | Clean branching logic vs. if/else inside nodes |
| Implement max iterations | Prevents infinite loops in agent systems |
| Use async nodes | Better concurrency for I/O-bound operations |
| Use subgraphs for modularity | Testable, reusable, manageable complexity |
| Use streaming for UX | Real-time feedback keeps users engaged |
| Validate inputs at edges | Check state before routing to prevent bad paths |
| Add system prompts per node | Each node can have focused instructions |
| Tip | Impact |
|---|---|
| Use async nodes everywhere | 2-5x throughput improvement with concurrent I/O |
| Cache tool results | Avoid redundant API calls (use lru_cache or Redis) |
| Use fast models for routing | GPT-4o-mini for edges, GPT-4o for generation |
| Limit token context | Summarize old messages to reduce token usage |
| Parallel nodes where possible | Run independent retrieval/searches concurrently |
| Use SQLite checkpointer for single-process | Faster than Postgres for local dev |
| Set timeouts on LLM calls | Prevent hung graphs (asyncio.wait_for) |
| Profile with LangSmith | Identify slow nodes, high token usage |
| Batch operations | Use batch() instead of multiple invoke() calls |
| Use ToolNode handle_tool_errors | Graceful degradation instead of crashes |
# ── Core graph building ──
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# ── State types ──
from typing import TypedDict, Annotated, Literal
# ── Messages ──
from langchain_core.messages import (
HumanMessage, AIMessage, SystemMessage,
BaseMessage, ToolMessage
)
# ── Tools ──
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition
# ── Prebuilt agents ──
from langgraph.prebuilt import create_react_agent
# ── Checkpointing ──
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# ── Human-in-the-loop ──
from langgraph.types import interrupt, Command
# ── Streaming ──
from langgraph.types import StreamWriter| Pitfall | Solution |
|---|---|
| Forgetting checkpointer for HITL | interrupt() requires a checkpointer to work |
| Overwriting list state | Use Annotated[list, reducer] for all accumulative fields |
| Infinite agent loops | Add max iteration guard in conditional edge function |
| Missing tool_calls check | Use hasattr(msg, "tool_calls") before accessing |
| Wrong return type from node | Must return dict with keys matching State fields |
| Not using thread_id | Each conversation needs a unique thread_id for memory |
| Sync nodes blocking event loop | Use async def for all I/O-bound node functions |
| Unreachable nodes in graph | compile() validates but review your edge connections |
Annotated[list, add_messages] for messages and Annotated[list, operator.add] for accumulating results. Test in LangGraph Studio before deploying.| Feature | LangChain (LCEL) | LangGraph |
|---|---|---|
| Definition style | pipe: chain = prompt | model | parser | Graph: nodes + edges |
| Branching | RunnableBranch (limited) | Conditional edges (flexible) |
| Loops | Not supported natively | First-class via conditional edges |
| State | Implicit dict passing | Explicit TypedDict with reducers |
| Persistence | RunnableWithMessageHistory | Built-in checkpointing per step |
| Streaming | astream / astream_events | stream_mode: values, updates, messages |
| Human approval | Manual implementation | interrupt() / Command(resume) |
| Debugging | LangSmith traces only | LangSmith + LangGraph Studio |
| Multi-agent | Not supported | Subgraphs, supervisor, swarm |
| Error recovery | Manual retry logic | State history, time-travel, replay |
| Type safety | Partial (runtime dict) | TypedDict + Annotated (compile-time) |
| Deployment | LangServe | LangSmith Deployment + Studio |
| Learning curve | Low (simple pipe syntax) | Medium (graph concepts) |
| Best for | Simple chains, transforms | Agents, complex workflows, production |
# ── BEFORE: LangChain Chain (LCEL) ──
chain = (
prompt
| model.bind_tools(tools)
| ToolsAgentOutputParser()
)
# ── AFTER: LangGraph equivalent ──
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(model, tools)
# ── BEFORE: Chain with memory ──
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
)
# ── AFTER: LangGraph with checkpointing ──
agent = create_react_agent(
model, tools,
checkpointer=MemorySaver()
)
config = {"configurable": {"thread_id": "session-1"}}