LangGraph 实际应用
TIP
通过完整的 AI Agent 实例,展示 LangGraph 在复杂工作流中的应用。
AI 研究助手
python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
from langchain_openai import ChatOpenAI
class ResearchState(TypedDict):
topic: str
search_results: Annotated[list, add]
analysis: str
report: str
iteration: int
llm = ChatOpenAI(model="gpt-4", temperature=0.3)
def search_node(state: ResearchState):
results = search_web(state["topic"])
return {"search_results": [results], "iteration": state.get("iteration", 0) + 1}
def generate_node(state: ResearchState):
report = llm.invoke(f"基于分析: {state['analysis']}\n生成报告: {state['topic']}")
return {"report": report}
def need_more_search(state: ResearchState):
if state["iteration"] < 3:
return "continue"
return "generate"
builder = StateGraph(ResearchState)
builder.add_node("search", search_node)
builder.add_node("generate", generate_node)
builder.set_entry_point("search")
builder.add_conditional_edges("search", need_more_search,
{"continue": "search", "generate": "generate"})
builder.add_edge("generate", END)
graph = builder.compile()
result = graph.invoke({
"topic": "2024大模型发展",
"search_results": [], "analysis": "", "report": "", "iteration": 0
})关键特性
- 状态持久化:所有节点共享状态
- 条件路由:根据结果动态选择路径
- 循环支持:可以构建 Agent 思考-行动循环
- 可编译:编译后可以直接 invoke